You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/01 14:03:40 UTC

[linkis] branch dev-1.4.0 updated: Generate spark sql from kafka es mongo (#4588)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new f40bf6f3a Generate spark sql from kafka es mongo (#4588)
f40bf6f3a is described below

commit f40bf6f3a4617359bef46ef1a95f5b3d9dd6abed
Author: ChengJie1053 <18...@163.com>
AuthorDate: Thu Jun 1 22:03:33 2023 +0800

    Generate spark sql from kafka es mongo (#4588)
    
    * Generate spark sql from kafka es mongo
    
    * Add DataSourceTypeEnum
---
 .../DataSourceTypeEnum.java}                       |  25 ++--
 .../query/common/service/SparkDdlSQlTemplate.java  |  33 +++++
 .../service/impl/MetadataQueryServiceImpl.java     | 153 +++++++++++++++++----
 3 files changed, 173 insertions(+), 38 deletions(-)

diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
similarity index 59%
copy from linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
copy to linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
index 62b83fe05..8fc9ff6d0 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/DataSourceTypeEnum.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.metadata.query.common.service;
+package org.apache.linkis.metadata.query.common.domain;
 
-public class SparkDdlSQlTemplate {
+public enum DataSourceTypeEnum {
+  KAFKA("kafka"),
+  MONGODB("mongodb"),
+  ELASTICSEARCH("elasticsearch");
 
-  public static final String JDBC_DDL_SQL_TEMPLATE =
-      "CREATE TEMPORARY TABLE %s "
-          + "USING org.apache.spark.sql.jdbc "
-          + "OPTIONS ("
-          + "  url '%s',"
-          + "  dbtable '%s',"
-          + "  user '%s',"
-          + "  password '%s'"
-          + ")";
+  private String value;
 
-  public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM ${resultTable}";
+  DataSourceTypeEnum(String value) {
+    this.value = value;
+  }
 
-  public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s";
+  public String getValue() {
+    return this.value;
+  }
 }
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
index 62b83fe05..0b3ee7bc4 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
@@ -19,6 +19,15 @@ package org.apache.linkis.metadata.query.common.service;
 
 public class SparkDdlSQlTemplate {
 
+  public static final String ES_DDL_SQL_TEMPLATE =
+      "CREATE TEMPORARY TABLE %s "
+          + "USING org.elasticsearch.spark.sql  "
+          + "OPTIONS ("
+          + "  'es.nodes' '%s',"
+          + "  'es.port' '%s',"
+          + "  'es.resource' '%s/_doc'"
+          + ")";
+
   public static final String JDBC_DDL_SQL_TEMPLATE =
       "CREATE TEMPORARY TABLE %s "
           + "USING org.apache.spark.sql.jdbc "
@@ -29,7 +38,31 @@ public class SparkDdlSQlTemplate {
           + "  password '%s'"
           + ")";
 
+  public static final String KAFKA_DDL_SQL_TEMPLATE =
+      "CREATE TEMPORARY TABLE %s "
+          + "USING kafka "
+          + "OPTIONS ("
+          + "  'kafka.bootstrap.servers' '%s',"
+          + "  'subscribe' '%s'"
+          + ")";
+
+  public static final String MONGO_DDL_SQL_TEMPLATE =
+      "CREATE TEMPORARY TABLE %s "
+          + "USING mongo "
+          + "OPTIONS ("
+          + "  'spark.mongodb.input.uri' '%s',"
+          + "  'spark.mongodb.input.database' '%s',"
+          + "  'spark.mongodb.input.collection' '%s'"
+          + ")";
   public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM ${resultTable}";
 
   public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s";
+
+  public static String generateDqlSql(String columns, String table) {
+    return String.format(SparkDdlSQlTemplate.DQL_SQL_TEMPLATE, columns, table);
+  }
+
+  public static String generateDmlSql(String table) {
+    return String.format(SparkDdlSQlTemplate.DML_SQL_TEMPLATE, table);
+  }
 }
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
index 1a9b6db7d..5a1608dcb 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
@@ -23,8 +23,10 @@ import org.apache.linkis.datasourcemanager.common.auth.AuthContext;
 import org.apache.linkis.datasourcemanager.common.domain.DataSource;
 import org.apache.linkis.datasourcemanager.common.protocol.DsInfoQueryRequest;
 import org.apache.linkis.datasourcemanager.common.protocol.DsInfoResponse;
+import org.apache.linkis.datasourcemanager.common.util.json.Json;
 import org.apache.linkis.metadata.query.common.MdmConfiguration;
 import org.apache.linkis.metadata.query.common.cache.CacheConfiguration;
+import org.apache.linkis.metadata.query.common.domain.DataSourceTypeEnum;
 import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo;
 import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
 import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
@@ -38,6 +40,7 @@ import org.apache.linkis.rpc.Sender;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
 
 import org.springframework.stereotype.Service;
 
@@ -379,32 +382,134 @@ public class MetadataQueryServiceImpl implements MetadataQueryService {
     DsInfoResponse dsInfoResponse =
         queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId);
 
-    if (StringUtils.isNotBlank(dsInfoResponse.getDsType())
-        && CacheConfiguration.MYSQL_RELATIONSHIP_LIST
-            .getValue()
-            .contains(dsInfoResponse.getDsType())) {
-      List<MetaColumnInfo> columns =
-          invokeMetaMethod(
-              dsInfoResponse.getDsType(),
-              "getColumns",
-              new Object[] {
-                dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table
-              },
-              List.class);
-
-      String sqlConnectUrl =
-          invokeMetaMethod(
-              dsInfoResponse.getDsType(),
-              "getSqlConnectUrl",
-              new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()},
-              String.class);
-
-      return getSparkSqlByJdbc(database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl);
+    if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) {
+      List<MetaColumnInfo> columns = new ArrayList<>();
+      try {
+        columns =
+            invokeMetaMethod(
+                dsInfoResponse.getDsType(),
+                "getColumns",
+                new Object[] {
+                  dsInfoResponse.getCreator(),
+                  dsInfoResponse.getParams(),
+                  database,
+                  dsInfoResponse
+                          .getDsType()
+                          .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())
+                      ? "_doc"
+                      : table
+                },
+                List.class);
+      } catch (Exception e) {
+        logger.warn("Fail to get Sql columns(获取字段列表失败)");
+      }
+      if (CacheConfiguration.MYSQL_RELATIONSHIP_LIST
+          .getValue()
+          .contains(dsInfoResponse.getDsType())) {
+        String sqlConnectUrl =
+            invokeMetaMethod(
+                dsInfoResponse.getDsType(),
+                "getSqlConnectUrl",
+                new Object[] {dsInfoResponse.getCreator(), dsInfoResponse.getParams()},
+                String.class);
+
+        return getSparkSqlByJdbc(
+            database, table, dsInfoResponse.getParams(), columns, sqlConnectUrl);
+      } else if (dsInfoResponse.getDsType().equalsIgnoreCase(DataSourceTypeEnum.KAFKA.getValue())) {
+        return getSparkSqlByKafka(table, dsInfoResponse.getParams());
+      } else if (dsInfoResponse
+          .getDsType()
+          .equalsIgnoreCase(DataSourceTypeEnum.MONGODB.getValue())) {
+        return getSparkSqlByMongo(database, table, dsInfoResponse.getParams(), columns);
+      } else if (dsInfoResponse
+          .getDsType()
+          .equalsIgnoreCase(DataSourceTypeEnum.ELASTICSEARCH.getValue())) {
+        return getSparkSqlByElasticsearch(table, dsInfoResponse.getParams(), columns);
+      }
     }
 
     return new GenerateSqlInfo();
   }
 
+  public GenerateSqlInfo getSparkSqlByElasticsearch(
+      String table, Map<String, Object> params, List<MetaColumnInfo> columns) {
+    GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+
+    String[] endPoints = new String[] {};
+    Object urls = params.getOrDefault("elasticUrls", "[\"localhost:9200\"]");
+    try {
+      if (!(urls instanceof List)) {
+        List<String> urlList = Json.fromJson(String.valueOf(urls), List.class, String.class);
+        assert urlList != null;
+        endPoints = urlList.toArray(endPoints);
+      } else {
+        endPoints = ((List<String>) urls).toArray(endPoints);
+      }
+    } catch (Exception e) {
+      logger.warn("Fail to get ElasticSearch urls", e);
+    }
+
+    HttpHost httpHost = HttpHost.create(endPoints[0]);
+    String ddl =
+        String.format(
+            SparkDdlSQlTemplate.ES_DDL_SQL_TEMPLATE,
+            table,
+            httpHost.getHostName(),
+            httpHost.getPort(),
+            table);
+    generateSqlInfo.setDdl(ddl);
+
+    generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+    String columnStr = "*";
+    if (CollectionUtils.isNotEmpty(columns)) {
+      columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(","));
+    }
+
+    generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
+    return generateSqlInfo;
+  }
+
+  public GenerateSqlInfo getSparkSqlByMongo(
+      String database, String table, Map<String, Object> params, List<MetaColumnInfo> columns) {
+    GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+    String url =
+        String.format(
+            "mongodb://%s:%s/%s",
+            params.getOrDefault("host", ""), params.getOrDefault("port", ""), database);
+
+    String ddl =
+        String.format(SparkDdlSQlTemplate.MONGO_DDL_SQL_TEMPLATE, table, url, database, table);
+    generateSqlInfo.setDdl(ddl);
+
+    generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+    String columnStr = "*";
+    if (CollectionUtils.isNotEmpty(columns)) {
+      columnStr =
+          columns.stream()
+              .filter(column -> !column.getName().equals("_id"))
+              .map(MetaColumnInfo::getName)
+              .collect(Collectors.joining(","));
+    }
+
+    generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
+    return generateSqlInfo;
+  }
+
+  public GenerateSqlInfo getSparkSqlByKafka(String table, Map<String, Object> params) {
+    GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+    String kafkaServers = String.valueOf(params.getOrDefault("uris", "localhost:9092"));
+    String ddl =
+        String.format(SparkDdlSQlTemplate.KAFKA_DDL_SQL_TEMPLATE, table, kafkaServers, table);
+    generateSqlInfo.setDdl(ddl);
+
+    generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
+
+    generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql("CAST(value AS STRING)", table));
+    return generateSqlInfo;
+  }
+
   public GenerateSqlInfo getSparkSqlByJdbc(
       String database,
       String table,
@@ -430,16 +535,14 @@ public class MetadataQueryServiceImpl implements MetadataQueryService {
             params.getOrDefault("password", ""));
     generateSqlInfo.setDdl(ddl);
 
-    String dml = String.format(SparkDdlSQlTemplate.DML_SQL_TEMPLATE, sparkTableName);
-    generateSqlInfo.setDml(dml);
+    generateSqlInfo.setDml(SparkDdlSQlTemplate.generateDmlSql(table));
 
     String columnStr = "*";
     if (CollectionUtils.isNotEmpty(columns)) {
       columnStr = columns.stream().map(column -> column.getName()).collect(Collectors.joining(","));
     }
 
-    String dql = String.format(SparkDdlSQlTemplate.DQL_SQL_TEMPLATE, columnStr, sparkTableName);
-    generateSqlInfo.setDql(dql);
+    generateSqlInfo.setDql(SparkDdlSQlTemplate.generateDqlSql(columnStr, table));
     return generateSqlInfo;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org