You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/01 14:03:40 UTC
[linkis] branch dev-1.4.0 updated: Generate spark sql from kafka es mongo (#4588)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new f40bf6f3a Generate spark sql from kafka es mongo (#4588)
f40bf6f3a is described below
commit f40bf6f3a4617359bef46ef1a95f5b3d9dd6abed
Author: ChengJie1053 <18...@163.com>
AuthorDate: Thu Jun 1 22:03:33 2023 +0800
Generate spark sql from kafka es mongo (#4588)
* Generate spark sql from kafka es mongo
* Add DataSourceTypeEnum
---
.../DataSourceTypeEnum.java} | 25 ++--
.../query/common/service/SparkDdlSQlTemplate.java | 33 +++++
.../service/impl/MetadataQueryServiceImpl.java | 153 +++++++++++++++++----
3 files changed, 173 insertions(+), 38 deletions(-)
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
similarity index 59%
copy from linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
copy to linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
index 62b83fe05..8fc9ff6d0 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.linkis.metadata.query.common.service;
+package org.apache.linkis.metadata.query.common.domain;
-public class SparkDdlSQlTemplate {
+public enum DataSourceTypeEnum {
+ KAFKA("kafka"),
+ MONGODB("mongodb"),
+ ELASTICSEARCH("elasticsearch");
- public static final String JDBC_DDL_SQL_TEMPLATE =
- "CREATE TEMPORARY TABLE %s "
- + "USING org.apache.spark.sql.jdbc "
- + "OPTIONS ("
- + " url '%s',"
- + " dbtable '%s',"
- + " user '%s',"
- + " password '%s'"
- + ")";
+ private String value;
- public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM ${resultTable}";
+ DataSourceTypeEnum(String value) {
+ this.value = value;
+ }
- public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s";
+ public String getValue() {
+ return this.value;
+ }
}
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
index 62b83fe05..0b3ee7bc4 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
@@ -19,6 +19,15 @@ package org.apache.linkis.metadata.query.common.service;
public class SparkDdlSQlTemplate {
+ public static final String ES_DDL_SQL_TEMPLATE =
+ "CREATE TEMPORARY TABLE %s "
+ + "USING org.elasticsearch.spark.sql "
+ + "OPTIONS ("
+ + " 'es.nodes' '%s',"
+ + " 'es.port' '%s',"
+ + " 'es.resource' '%s/_doc'"
+ + ")";
+
public static final String JDBC_DDL_SQL_TEMPLATE =
"CREATE TEMPORARY TABLE %s "
+ "USING org.apache.spark.sql.jdbc "
@@ -29,7 +38,31 @@ public class SparkDdlSQlTemplate {
+ " password '%s'"
+ ")";
+ public static final String KAFKA_DDL_SQL_TEMPLATE =
+ "CREATE TEMPORARY TABLE %s "
+ + "USING kafka "
+ + "OPTIONS ("
+ + " 'kafka.bootstrap.servers' '%s',"
+ + " 'subscribe' '%s'"
+ + ")";
+
+ public static final String MONGO_DDL_SQL_TEMPLATE =
+ "CREATE TEMPORARY TABLE %s "
+ + "USING mongo "
+ + "OPTIONS ("
+ + " 'spark.mongodb.input.uri' '%s',"
+ + " 'spark.mongodb.input.database' '%s',"
+ + " 'spark.mongodb.input.collection' '%s'"
+ + ")";
public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM ${resultTable}";
public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s";
+
+ public static String generateDqlSql(String columns, String table) {
+ return String.format(SparkDdlSQlTemplate.DQL_SQL_TEMPLATE, columns, table);
+ }
+
+ public static String generateDmlSql(String table) {
+ return String.format(SparkDdlSQlTemplate.DML_SQL_TEMPLATE, table);
+ }
}
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
index 1a9b6db7d..5a1608dcb 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
@@ -23,8 +23,10 @@ import org.apache.linkis.datasourcemanager.common.auth.AuthContext;
import org.apache.linkis.datasourcemanager.common.domain.DataSource;
import org.apache.linkis.datasourcemanager.common.protocol.DsInfoQueryRequest;
import org.apache.linkis.datasourcemanager.common.protocol.DsInfoResponse;
+import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.metadata.query.common.MdmConfiguration;
import org.apache.linkis.metadata.query.common.cache.CacheConfiguration;
+import org.apache.linkis.metadata.query.common.domain.DataSourceTypeEnum;
import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
@@ -38,6 +40,7 @@ import org.apache.linkis.rpc.Sender;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
import org.springframework.stereotype.Service;
@@ -379,32 +382,134 @@ public class MetadataQueryServiceImpl implements MetadataQueryService {
DsInfoResponse dsInfoResponse =
queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId);
- if (StringUtils.isNotBlank(dsInfoResponse.getDsType())
- && CacheConfiguration.MYSQL_RELATIONSHIP_LIST
- .getValue()
- .contains(dsInfoResponse.getDsType())) {
- List<MetaColumnInfo> columns =
- invokeMetaMethod(
- dsInfoResponse.getDsType(),
- "getColumns",
- new Object[] {
- dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table
- },
- List.class);
-
- String sqlConnectUrl =
- invokeMetaMethod(
- dsInfoResponse.getDsType(),
- "getSqlConnectUrl",
- new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()},
- String.class);
-
- return getSparkSqlByJdbc(database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl);
+ if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) {
+ List<MetaColumnInfo> columns = new ArrayList<>();
+ try {
+ columns =
+ invokeMetaMethod(
+ dsInfoResponse.getDsType(),
+ "getColumns",
+ new Object[] {
+ dsInfoResponse.getCreator(),
+ dsInfoResponse.getParams(),
+ database,
+ dsInfoResponse
+ .getDsType()
+ .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())
+ ? "_doc"
+ : table
+ },
+ List.class);
+ } catch (Exception e) {
+ logger.warn("Fail to get Sql columns(获取字段列表失败)");
+ }
+ if (CacheConfiguration.MYSQL_RELATIONSHIP_LIST
+ .getValue()
+ .contains(dsInfoResponse.getDsType())) {
+ String sqlConnectUrl =
+ invokeMetaMethod(
+ dsInfoResponse.getDsType(),
+ "getSqlConnectUrl",
+ new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()},
+ String.class);
+
+ return getSparkSqlByJdbc(
+ database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl);
+ } else if (dsInfoResponse.getDsType().equalsIgnoreCase(DataSourceTypeEnum.KAFKA.getValue())) {
+ return getSparkSqlByKafka(table, dsInfoResponse.getParams());
+ } else if (dsInfoResponse
+ .getDsType()
+ .equalsIgnoreCase(DataSourceTypeEnum.MONGODB.getValue())) {
+ return getSparkSqlByMongo(database, table, dsInfoResponse.getParams(), columns);
+ } else if (dsInfoResponse
+ .getDsType()
+ .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())) {
+ return getSparkSqlByElasticsearch(table, dsInfoResponse.getParams(), columns);
+ }
}
return new GenerateSqlInfo();
}
+ public GenerateSqlInfo getSparkSqlByElasticsearch(
+ String table, Map<String, Object> params, List<MetaColumnInfo> columns) {
+ GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+
+ String[] endPoints = new String[] {};
+ Object urls = params.getOrDefault("elasticUrls", "[\"localhost:9200\"]");
+ try {
+ if (!(urls instanceof List)) {
+ List<String> urlList = Json.fromJson(String.valueOf(urls), List.class, String.class);
+ assert urlList != null;
+ endPoints = urlList.toArray(endPoints);
+ } else {
+ endPoints = ((List<String>) urls).toArray(endPoints);
+ }
+ } catch (Exception e) {
+ logger.warn("Fail to get ElasticSearch urls", e);
+ }
+
+ HttpHost httpHost = HttpHost.create(endPoints[0]);
+ String ddl =
+ String.format(
+ SparkDdlSQlTemplate.ES_DDL_SQL_TEMPLATE,
+ table,
+ httpHost.getHostName(),
+ httpHost.getPort(),
+ table);
+ generateSqlInfo.setDdl(ddl);
+
+ generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+ String columnStr = "*";
+ if (CollectionUtils.isNotEmpty(columns)) {
+ columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(","));
+ }
+
+ generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
+ return generateSqlInfo;
+ }
+
+ public GenerateSqlInfo getSparkSqlByMongo(
+ String database, String table, Map<String, Object> params, List<MetaColumnInfo> columns) {
+ GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+ String url =
+ String.format(
+ "mongodb://%s:%s/%s",
+ params.getOrDefault("host", ""), params.getOrDefault("port", ""), database);
+
+ String ddl =
+ String.format(SparkDdlSQlTemplate.MONGO_DDL_SQL_TEMPLATE, table, url, database, table);
+ generateSqlInfo.setDdl(ddl);
+
+ generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+ String columnStr = "*";
+ if (CollectionUtils.isNotEmpty(columns)) {
+ columnStr =
+ columns.stream()
+ .filter(column -> !column.getName().equals("_id"))
+ .map(MetaColumnInfo::getName)
+ .collect(Collectors.joining(","));
+ }
+
+ generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
+ return generateSqlInfo;
+ }
+
+ public GenerateSqlInfo getSparkSqlByKafka(String table, Map<String, Object> params) {
+ GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+ String kafkaServers = String.valueOf(params.getOrDefault("uris", "localhost:9092"));
+ String ddl =
+ String.format(SparkDdlSQlTemplate.KAFKA_DDL_SQL_TEMPLATE, table, kafkaServers, table);
+ generateSqlInfo.setDdl(ddl);
+
+ generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+ generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql("CAST(value AS STRING)", table));
+ return generateSqlInfo;
+ }
+
public GenerateSqlInfo getSparkSqlByJdbc(
String database,
String table,
@@ -430,16 +535,14 @@ public class MetadataQueryServiceImpl implements MetadataQueryService {
params.getOrDefault("password", ""));
generateSqlInfo.setDdl(ddl);
- String dml = String.format(SparkDdlSQlTemplate.DML_SQL_TEMPLATE, sparkTableName);
- generateSqlInfo.setDml(dml);
+ generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
String columnStr = "*";
if (CollectionUtils.isNotEmpty(columns)) {
columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(","));
}
- String dql = String.format(SparkDdlSQlTemplate.DQL_SQL_TEMPLATE, columnStr, sparkTableName);
- generateSqlInfo.setDql(dql);
+ generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
return generateSqlInfo;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org