You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 11:40:35 UTC

[inlong] 04/04: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit c7af0fcc0790d10ba0d4180c342aae8715a13f32
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Jan 4 18:48:49 2023 +0800

    [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)
---
 .../service/cluster/KafkaClusterOperator.java      | 25 +++++++++++++++++++
 .../service/cluster/TubeClusterOperator.java       | 29 ++++++++++++++++++++--
 .../node/ck/ClickHouseDataNodeOperator.java        | 22 ++++++++++++++++
 .../node/iceberg/IcebergDataNodeOperator.java      | 23 +++++++++++++++++
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 +++++++++++++++++
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 ++++++++++++++++++
 6 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 8c4eb37b7..dceac1864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Properties;
+
 /**
  * Kafka cluster operator.
  */
@@ -86,4 +93,22 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        String bootstrapServers = request.getUrl();
+        Preconditions.checkNotNull(bootstrapServers, "connection url cannot be empty");
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        try (Admin ignored = Admin.create(props)) {
+            ListTopicsResult topics = ignored.listTopics(new ListTopicsOptions().timeoutMs(30000));
+            topics.names().get();
+            LOGGER.info("kafka connection not null - connection success for bootstrapServers={}", topics);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("kafka connection failed for bootstrapServers=%s", bootstrapServers);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb24bd741..b3cf872fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * TubeMQ cluster operator.
  */
@@ -86,4 +91,24 @@ public class TubeClusterOperator extends AbstractClusterOperator {
         return tubeClusterInfo;
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        String masterUrl = request.getUrl();
+        int hostBeginIndex = masterUrl.lastIndexOf(InlongConstants.SLASH);
+        int portBeginIndex = masterUrl.lastIndexOf(InlongConstants.COLON);
+        String host = masterUrl.substring(hostBeginIndex + 1, portBeginIndex);
+        int port = Integer.parseInt(masterUrl.substring(portBeginIndex + 1));
+        Preconditions.checkNotNull(masterUrl, "connection url cannot be empty");
+        boolean result;
+        try {
+            result = HttpUtils.checkConnectivity(host, port, 10, TimeUnit.SECONDS);
+            LOGGER.info("tube connection not null - connection success for masterUrl={}", masterUrl);
+            return result;
+        } catch (Exception e) {
+            String errMsg = String.format("tube connection failed for masterUrl=%s", masterUrl);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
index 08e15de38..7f386416b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 @Service
 public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
 
@@ -82,4 +86,22 @@ public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String url = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(url, "connection url cannot be empty");
+        try (Connection ignored = ClickHouseJdbcUtils.getConnection(url, username, password)) {
+            LOGGER.info("clickhouse connection not null - connection success for url={}, username={}, password={}", url,
+                    username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("clickhouse connection failed for url=%s, username=%s, password=%s", url,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
index 991764610..74506feaa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.node.iceberg;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,6 +32,7 @@ import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -83,4 +86,24 @@ public class IcebergDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        IcebergDataNodeRequest icebergDataNodeRequest = (IcebergDataNodeRequest) request;
+        String metastoreUri = icebergDataNodeRequest.getUrl();
+        String warehouse = icebergDataNodeRequest.getWarehouse();
+        Preconditions.checkNotNull(metastoreUri, "connection url cannot be empty");
+        try {
+            HiveCatalog catalog = IcebergCatalogUtils.getCatalog(metastoreUri, warehouse);
+            catalog.listNamespaces();
+            LOGGER.info("iceberg connection not null - connection success for metastoreUri={}, warehouse={}",
+                    metastoreUri, warehouse);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("iceberg connection failed for metastoreUri=%s, warhouse=%s", metastoreUri,
+                    warehouse);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index aacd09dd9..9848b23c3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 /**
  * MySQL data node operator
  */
@@ -86,4 +90,23 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
         }
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String jdbcUrl = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+        try (Connection ignored = MySQLJdbcUtils.getConnection(jdbcUrl, username, password)) {
+            LOGGER.info("mysql connection not null - connection success for jdbcUrl={}, username={}, password={}",
+                    jdbcUrl, username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("mysql connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
index fb82d894c..e4249ac08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.starrocks.StarRocksJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 @Service
 public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
 
@@ -83,4 +87,24 @@ public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
         }
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String jdbcUrl = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+        try (Connection ignored = StarRocksJdbcUtils.getConnection(jdbcUrl, username, password)) {
+            LOGGER.info("starRocks connection not null - connection success for jdbcUrl={}, username={}, password={}",
+                    jdbcUrl, username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("starRocks connection failed for jdbcUrl=%s, username=%s, password=%s",
+                    jdbcUrl,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }