You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 12:15:07 UTC

[inlong] branch master updated: [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fcc6ee24a [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
fcc6ee24a is described below

commit fcc6ee24a70364d5a232dc782112f1f6917dccb4
Author: woofyzhao <49...@qq.com>
AuthorDate: Tue Sep 27 20:15:01 2022 +0800

    [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
---
 .../resources/mappers/StreamSinkEntityMapper.xml   |  1 +
 .../manager/pojo/node/hive/HiveDataNodeInfo.java   |  3 --
 .../apache/inlong/manager/pojo/sink/SinkInfo.java  |  1 +
 .../service/node/DataNodeOperateHelper.java        | 54 ++++++++++++++++++++++
 .../resource/sink/hive/HiveResourceOperator.java   | 31 ++++++++++++-
 5 files changed, 86 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index bd9b72330..5d1d3ea25 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -324,6 +324,7 @@
         sink.status,
         sink.creator,
         sink.sink_name,
+        sink.data_node_name,
         stream.mq_resource,
         stream.data_type,
         stream.data_separator as sourceSeparator,
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
index 34b50ed06..8602762e4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -37,9 +37,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 @ApiModel("Hive data node info")
 public class HiveDataNodeInfo extends DataNodeInfo {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
-
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
index 916ae199e..c2d2c3fff 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
@@ -32,6 +32,7 @@ public class SinkInfo {
     private String inlongStreamId;
     private String sinkType;
     private String sinkName;
+    private String dataNodeName;
     private String description;
     private Integer enableCreateResource;
     private String extParams;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
new file mode 100644
index 000000000..a77af7b66
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Data node helper service
+ */
+@Slf4j
+@Service
+public class DataNodeOperateHelper {
+
+    @Autowired
+    private DataNodeEntityMapper dataNodeMapper;
+    @Autowired
+    private DataNodeOperatorFactory operatorFactory;
+
+    /**
+     * Get data node info by name and type
+     */
+    public DataNodeInfo getDataNodeInfo(String dataNodeName, String dataNodeType) {
+        DataNodeEntity entity = dataNodeMapper.selectByNameAndType(dataNodeName, dataNodeType);
+        if (entity == null) {
+            log.error("data node not found by name={}, type={}", dataNodeName, dataNodeType);
+            throw new BusinessException("data node not found");
+        }
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+        DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
+        log.debug("success to get data node info by name={}, type={}", dataNodeName, dataNodeType);
+        return dataNodeInfo;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
index c67559140..9542c7e03 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
@@ -18,16 +18,21 @@
 package org.apache.inlong.manager.service.resource.sink.hive;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.hive.HiveColumnInfo;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkDTO;
 import org.apache.inlong.manager.pojo.sink.hive.HiveTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -51,6 +56,8 @@ public class HiveResourceOperator implements SinkResourceOperator {
     private StreamSinkService sinkService;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
 
     @Override
     public Boolean accept(String sinkType) {
@@ -78,6 +85,28 @@ public class HiveResourceOperator implements SinkResourceOperator {
         this.createTable(sinkInfo);
     }
 
+    private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
+        HiveSinkDTO hiveInfo = new HiveSinkDTO();
+
+        if (StringUtils.isNotBlank(sinkInfo.getExtParams())) {
+            HiveSinkDTO userSinkInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
+            CommonBeanUtils.copyProperties(userSinkInfo, hiveInfo);
+        }
+
+        // read from data node if not supplied by user
+        if (StringUtils.isBlank(hiveInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "hive jdbc url not specified and data node is empty");
+            HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo) dataNodeHelper.getDataNodeInfo(
+                    dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, hiveInfo);
+            hiveInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            hiveInfo.setUsername(dataNodeInfo.getUsername());
+            hiveInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return hiveInfo;
+    }
+
     private void createTable(SinkInfo sinkInfo) {
         LOGGER.info("begin to create hive table for sinkId={}", sinkInfo.getId());
 
@@ -97,7 +126,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
         }
 
         try {
-            HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
+            HiveSinkDTO hiveInfo = this.getHiveInfo(sinkInfo);
             HiveTableInfo tableInfo = HiveSinkDTO.getHiveTableInfo(hiveInfo, columnList);
             String url = hiveInfo.getJdbcUrl();
             String user = hiveInfo.getUsername();