You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 11:40:35 UTC
[inlong] 04/04: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit c7af0fcc0790d10ba0d4180c342aae8715a13f32
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Jan 4 18:48:49 2023 +0800
[INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)
---
.../service/cluster/KafkaClusterOperator.java | 25 +++++++++++++++++++
.../service/cluster/TubeClusterOperator.java | 29 ++++++++++++++++++++--
.../node/ck/ClickHouseDataNodeOperator.java | 22 ++++++++++++++++
.../node/iceberg/IcebergDataNodeOperator.java | 23 +++++++++++++++++
.../service/node/mysql/MySQLDataNodeOperator.java | 23 +++++++++++++++++
.../node/starrocks/StarRocksDataNodeOperator.java | 24 ++++++++++++++++++
6 files changed, 144 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 8c4eb37b7..dceac1864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.Properties;
+
/**
* Kafka cluster operator.
*/
@@ -86,4 +93,22 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
}
}
+ @Override
+ public Boolean testConnection(ClusterRequest request) {
+ String bootstrapServers = request.getUrl();
+ Preconditions.checkNotNull(bootstrapServers, "connection url cannot be empty");
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ try (Admin ignored = Admin.create(props)) {
+ ListTopicsResult topics = ignored.listTopics(new ListTopicsOptions().timeoutMs(30000));
+ topics.names().get();
+ LOGGER.info("kafka connection not null - connection success for bootstrapServers={}", topics);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("kafka connection failed for bootstrapServers=%s", bootstrapServers);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb24bd741..b3cf872fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterDTO;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.concurrent.TimeUnit;
+
/**
* TubeMQ cluster operator.
*/
@@ -86,4 +91,24 @@ public class TubeClusterOperator extends AbstractClusterOperator {
return tubeClusterInfo;
}
+ @Override
+ public Boolean testConnection(ClusterRequest request) {
+ String masterUrl = request.getUrl();
+ int hostBeginIndex = masterUrl.lastIndexOf(InlongConstants.SLASH);
+ int portBeginIndex = masterUrl.lastIndexOf(InlongConstants.COLON);
+ String host = masterUrl.substring(hostBeginIndex + 1, portBeginIndex);
+ int port = Integer.parseInt(masterUrl.substring(portBeginIndex + 1));
+ Preconditions.checkNotNull(masterUrl, "connection url cannot be empty");
+ boolean result;
+ try {
+ result = HttpUtils.checkConnectivity(host, port, 10, TimeUnit.SECONDS);
+ LOGGER.info("tube connection not null - connection success for masterUrl={}", masterUrl);
+ return result;
+ } catch (Exception e) {
+ String errMsg = String.format("tube connection failed for masterUrl=%s", masterUrl);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
index 08e15de38..7f386416b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.sql.Connection;
+
@Service
public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
@@ -82,4 +86,22 @@ public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
}
}
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String url = request.getUrl();
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.checkNotNull(url, "connection url cannot be empty");
+ try (Connection ignored = ClickHouseJdbcUtils.getConnection(url, username, password)) {
+ LOGGER.info("clickhouse connection not null - connection success for url={}, username={}, password={}", url,
+ username, password);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("clickhouse connection failed for url=%s, username=%s, password=%s", url,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
index 991764610..74506feaa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.node.iceberg;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.hive.HiveCatalog;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,6 +32,7 @@ import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO;
import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -83,4 +86,24 @@ public class IcebergDataNodeOperator extends AbstractDataNodeOperator {
}
}
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ IcebergDataNodeRequest icebergDataNodeRequest = (IcebergDataNodeRequest) request;
+ String metastoreUri = icebergDataNodeRequest.getUrl();
+ String warehouse = icebergDataNodeRequest.getWarehouse();
+ Preconditions.checkNotNull(metastoreUri, "connection url cannot be empty");
+ try {
+ HiveCatalog catalog = IcebergCatalogUtils.getCatalog(metastoreUri, warehouse);
+ catalog.listNamespaces();
+ LOGGER.info("iceberg connection not null - connection success for metastoreUri={}, warehouse={}",
+ metastoreUri, warehouse);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("iceberg connection failed for metastoreUri=%s, warhouse=%s", metastoreUri,
+ warehouse);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index aacd09dd9..9848b23c3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.sql.Connection;
+
/**
* MySQL data node operator
*/
@@ -86,4 +90,23 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
}
}
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String jdbcUrl = request.getUrl();
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+ try (Connection ignored = MySQLJdbcUtils.getConnection(jdbcUrl, username, password)) {
+ LOGGER.info("mysql connection not null - connection success for jdbcUrl={}, username={}, password={}",
+ jdbcUrl, username, password);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("mysql connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
index fb82d894c..e4249ac08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO;
import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.starrocks.StarRocksJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.sql.Connection;
+
@Service
public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
@@ -83,4 +87,24 @@ public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
}
}
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String jdbcUrl = request.getUrl();
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+ try (Connection ignored = StarRocksJdbcUtils.getConnection(jdbcUrl, username, password)) {
+ LOGGER.info("starRocks connection not null - connection success for jdbcUrl={}, username={}, password={}",
+ jdbcUrl, username, password);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("starRocks connection failed for jdbcUrl=%s, username=%s, password=%s",
+ jdbcUrl,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}