You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 02:00:12 UTC

[inlong] branch branch-1.5 updated (ee17285f0 -> 44feb7517)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from ee17285f0 [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)
     new aa6c08099 [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)
     new 44feb7517 [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../service/cluster/AbstractClusterOperator.java   |  8 ++++-
 .../service/cluster/InlongClusterOperator.java     | 10 ++++++-
 .../service/cluster/InlongClusterService.java      |  8 +++++
 .../service/cluster/InlongClusterServiceImpl.java  | 14 ++++++++-
 .../service/cluster/PulsarClusterOperator.java     | 22 ++++++++++++--
 .../service/node/AbstractDataNodeOperator.java     |  6 ++++
 .../manager/service/node/DataNodeOperator.java     |  8 +++++
 .../manager/service/node/DataNodeServiceImpl.java  | 28 ++----------------
 .../node/es/ElasticsearchDataNodeOperator.java     | 34 ++++++++++++++++++++++
 .../service/node/hive/HiveDataNodeOperator.java    | 22 ++++++++++++++
 .../service/resource/sink/es/ElasticsearchApi.java |  2 +-
 .../resource/sink/mysql/MySQLResourceOperator.java | 24 ++++++++++++++-
 .../manager/web/controller/DataNodeController.java |  2 +-
 .../web/controller/InlongClusterController.java    |  6 ++++
 14 files changed, 161 insertions(+), 33 deletions(-)


[inlong] 02/02: [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 44feb7517d182085f824064fcc0fdc411c289073
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Jan 3 20:20:36 2023 +0800

    [INLONG-7135][Manager] Support the connection test of data nodes and clusters (#7136)
---
 .../service/cluster/AbstractClusterOperator.java   |  8 ++++-
 .../service/cluster/InlongClusterOperator.java     | 10 ++++++-
 .../service/cluster/InlongClusterService.java      |  8 +++++
 .../service/cluster/InlongClusterServiceImpl.java  | 14 ++++++++-
 .../service/cluster/PulsarClusterOperator.java     | 22 ++++++++++++--
 .../service/node/AbstractDataNodeOperator.java     |  6 ++++
 .../manager/service/node/DataNodeOperator.java     |  8 +++++
 .../manager/service/node/DataNodeServiceImpl.java  | 28 ++----------------
 .../node/es/ElasticsearchDataNodeOperator.java     | 34 ++++++++++++++++++++++
 .../service/node/hive/HiveDataNodeOperator.java    | 22 ++++++++++++++
 .../service/resource/sink/es/ElasticsearchApi.java |  2 +-
 .../manager/web/controller/DataNodeController.java |  2 +-
 .../web/controller/InlongClusterController.java    |  6 ++++
 13 files changed, 138 insertions(+), 32 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index ace15bf67..d788c505e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.cluster;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -77,4 +77,10 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        throw new BusinessException(
+                String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(), request.getType()));
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
index dadd500ea..912279ea1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.manager.service.cluster;
 
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 
 /**
  * Interface of the inlong cluster operator.
@@ -63,4 +63,12 @@ public interface InlongClusterOperator {
      */
     void updateOpt(ClusterRequest request, String operator);
 
+    /**
+     * Test connection
+     *
+     * @param request request of the cluster
+     * @return Whether the connection is successful
+     */
+    Boolean testConnection(ClusterRequest request);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 289a77c56..8ecccbc8b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -429,4 +429,12 @@ public interface InlongClusterService {
      */
     String getAllConfig(String clusterName, String md5);
 
+    /**
+     * Test whether the connection can be successfully established.
+     *
+     * @param request connection request
+     * @return true or false
+     */
+    Boolean testConnection(ClusterRequest request);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a32e1d6f5..a76ec6dcd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -33,7 +34,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
@@ -1649,6 +1649,18 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return configJson;
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        LOGGER.info("begin test connection for: {}", request);
+        String type = request.getType();
+
+        // according to the data node type, test connection
+        InlongClusterOperator clusterOperator = clusterOperatorFactory.getInstance(request.getType());
+        Boolean result = clusterOperator.testConnection(request);
+        LOGGER.info("connection [{}] for: {}", result ? "success" : "failed", request);
+        return result;
+    }
+
     /**
      * Remove cluster tag from the given cluster entity.
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 3f7770d40..a4c98bb11 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -22,13 +22,15 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -86,4 +88,20 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
+        PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
+        CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+        try (PulsarAdmin ignored = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
+            LOGGER.info("pulsar connection not null - connection success for adminUrl={}", pulsarInfo.getAdminUrl());
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("pulsar connection failed for adminUrl=%s, password=%s",
+                    pulsarInfo.getAdminUrl(), pulsarInfo.getToken());
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index 51eeb91f6..91fae43a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -85,4 +85,10 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
     public Map<String, String> parse2SinkParams(DataNodeInfo info) {
         return JsonUtils.parseObject(info.getExtParams(), HashMap.class);
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        throw new BusinessException(
+                String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), request.getType()));
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index aeae290b7..14c8d35a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -71,4 +71,12 @@ public interface DataNodeOperator {
      * @return Sink params
      */
     Map<String, String> parse2SinkParams(DataNodeInfo info);
+
+    /**
+     * Test connection
+     * @param request request of the data node
+     * @return Whether the connection is successful
+     */
+    Boolean testConnection(DataNodeRequest request);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 13b613c77..60935ed54 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -19,13 +19,10 @@ package org.apache.inlong.manager.service.node;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
-
-import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -34,14 +31,12 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.apache.inlong.manager.pojo.user.UserInfo;
-import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.sql.Connection;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -341,28 +336,11 @@ public class DataNodeServiceImpl implements DataNodeService {
         LOGGER.info("begin test connection for: {}", request);
         String type = request.getType();
 
-        Boolean result = false;
-        if (DataNodeType.HIVE.equals(type)) {
-            result = testHiveConnection(request);
-        }
-
+        // according to the data node type, test connection
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+        Boolean result = dataNodeOperator.testConnection(request);
         LOGGER.info("connection [{}] for: {}", result ? "success" : "failed", request);
         return result;
     }
 
-    /**
-     * Test connection for Hive
-     */
-    private Boolean testHiveConnection(DataNodeRequest request) {
-        String url = request.getUrl();
-        Preconditions.checkNotNull(url, "connection url cannot be empty");
-        try (Connection ignored = HiveJdbcUtils.getConnection(url, request.getUsername(), request.getToken())) {
-            LOGGER.info("hive connection not null - connection success");
-            return true;
-        } catch (Exception e) {
-            LOGGER.error("hive connection failed: {}", e.getMessage());
-            return false;
-        }
-    }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 699b83a01..e07338f09 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,6 +31,9 @@ import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
+import org.elasticsearch.client.RequestOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -82,4 +86,34 @@ public class ElasticsearchDataNodeOperator extends AbstractDataNodeOperator {
         LOGGER.debug("success to get elasticsearch data node from entity");
         return info;
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String url = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(url, "connection url cannot be empty");
+        ElasticsearchApi client = new ElasticsearchApi();
+        ElasticsearchConfig config = new ElasticsearchConfig();
+        if (StringUtils.isNotEmpty(request.getUsername())) {
+            config.setAuthEnable(true);
+            config.setUsername(username);
+            config.setPassword(password);
+        }
+        config.setHosts(url);
+        client.setEsConfig(config);
+        boolean result;
+        try {
+            result = client.getEsClient().ping(RequestOptions.DEFAULT);
+            LOGGER.info("elasticsearch connection is {} for url={}, username={}, password={}", result, url, username,
+                    password);
+            return result;
+        } catch (Exception e) {
+            String errMsg = String.format("elasticsearch connection failed for url=%s, username=%s, password=%s", url,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
index 7cb25bd5f..6adbe0b1a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 @Service
 public class HiveDataNodeOperator extends AbstractDataNodeOperator {
 
@@ -84,4 +88,22 @@ public class HiveDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String url = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(url, "connection url cannot be empty");
+        try (Connection ignored = HiveJdbcUtils.getConnection(url, username, password)) {
+            LOGGER.info("hive connection not null - connection success for url={}, username={}, password={}", url,
+                    username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("hive connection failed for url=%s, username=%s, password=%s", url,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 41309e6de..43b187ec2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -227,7 +227,7 @@ public class ElasticsearchApi {
      *
      * @return RestHighLevelClient
      */
-    private RestHighLevelClient getEsClient() {
+    public RestHighLevelClient getEsClient() {
         return esConfig.highLevelClient();
     }
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
index 641abc9b9..33ddba8c8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
@@ -118,7 +118,7 @@ public class DataNodeController {
 
     @PostMapping("/node/testConnection")
     @ApiOperation(value = "Test connection for data node")
-    public Response<Boolean> testConnection(@Validated @RequestBody DataNodeRequest request) {
+    public Response<Boolean> testConnection(@RequestBody DataNodeRequest request) {
         return Response.success(dataNodeService.testConnection(request));
     }
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index af822e703..3ea4634d3 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -235,4 +235,10 @@ public class InlongClusterController {
         String username = LoginUserUtils.getLoginUser().getName();
         return Response.success(clusterService.bindNodeTag(request, username));
     }
+
+    @PostMapping("/cluster/testConnection")
+    @ApiOperation(value = "Test connection for inlong cluster")
+    public Response<Boolean> testConnection(@RequestBody ClusterRequest request) {
+        return Response.success(clusterService.testConnection(request));
+    }
 }


[inlong] 01/02: [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit aa6c0809927ad60e56f875158fa58ca61cb1b595
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Jan 3 18:58:25 2023 +0800

    [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)
---
 .../resource/sink/mysql/MySQLResourceOperator.java | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index 6c859cf2a..fb834e3fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -18,16 +18,21 @@
 package org.apache.inlong.manager.service.resource.sink.mysql;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -53,6 +58,9 @@ public class MySQLResourceOperator implements SinkResourceOperator {
     @Autowired
     private StreamSinkFieldEntityMapper fieldEntityMapper;
 
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
     @Override
     public Boolean accept(String sinkType) {
         return SinkType.MYSQL.equals(sinkType);
@@ -90,7 +98,7 @@ public class MySQLResourceOperator implements SinkResourceOperator {
             columnList.add(columnInfo);
         }
 
-        MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+        MySQLSinkDTO sinkDTO = this.getMysqlInfo(sinkInfo);
         MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
         try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
                 sinkDTO.getPassword())) {
@@ -114,4 +122,18 @@ public class MySQLResourceOperator implements SinkResourceOperator {
         LOG.info("success create MySQL table for data sink [" + sinkInfo.getId() + "]");
     }
 
+    private MySQLSinkDTO getMysqlInfo(SinkInfo sinkInfo) {
+        MySQLSinkDTO mysqlInfo = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        if (StringUtils.isBlank(mysqlInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo);
+            mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            mysqlInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return mysqlInfo;
+    }
+
 }
\ No newline at end of file