You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/06 03:39:40 UTC

[inlong] branch master updated: [INLONG-4812][Manager] Choose the specified connector instead of all (#4832)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff63d25f [INLONG-4812][Manager] Choose the specified connector instead of all (#4832)
3ff63d25f is described below

commit 3ff63d25f7c3205b3024c5f02eae79f9288dbc31
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Wed Jul 6 11:39:35 2022 +0800

    [INLONG-4812][Manager] Choose the specified connector instead of all (#4832)
---
 .../manager/common/consts/InlongConstants.java     |  17 ++-
 .../manager/plugin/flink/FlinkOperation.java       | 124 +++++++++++++++++++--
 .../plugin/flink/enums/ConnectorJarType.java       |  99 ++++++++++++++++
 3 files changed, 230 insertions(+), 10 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index f1c336d49..2054de5f0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -55,9 +55,24 @@ public class InlongConstants {
     /**
      * Sort config
      */
-
     public static final String DATA_FLOW = "dataFlow";
 
+    public static final String STREAMS = "streams";
+
+    public static final String RELATIONS = "relations";
+
+    public static final String INPUTS = "inputs";
+
+    public static final String OUTPUTS = "outputs";
+
+    public static final String NODES = "nodes";
+
+    public static final String NODE_TYPE = "type";
+
+    public static final String LOAD = "Load";
+
+    public static final String EXTRACT = "Extract";
+
     public static final String SORT_JOB_ID = "sort.job.id";
 
     public static final String SORT_TYPE = "sort.type";
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index ccefecd73..b2afe90f5 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -17,13 +17,20 @@
 
 package org.apache.inlong.manager.plugin.flink;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.ConnectorJarType;
 import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
 import org.apache.inlong.manager.plugin.util.FlinkUtils;
 
@@ -33,10 +40,16 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 
@@ -46,6 +59,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
 @Slf4j
 public class FlinkOperation {
 
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final String CONFIG_FILE = "application.properties";
     private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
     private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
@@ -53,6 +67,8 @@ public class FlinkOperation {
     private static final String INLONG_MANAGER = "inlong-manager";
     private static final String INLONG_SORT = "inlong-sort";
     private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
+    private static final String CONNECTOR_JAR_PATTERN = "^sort-connector-(?i)(%s).*jar$";
+    private static final String ALL_CONNECTOR_JAR_PATTERN = "^sort-connector-.*jar$";
     private static Properties properties;
     private final FlinkService flinkService;
 
@@ -77,13 +93,11 @@ public class FlinkOperation {
     /**
      * Get Sort connector jar patterns from the Flink info.
      */
-    private static String getConnectorJarPattern(FlinkInfo flinkInfo) {
-        if (StringUtils.isNotEmpty(flinkInfo.getSourceType()) && StringUtils.isNotEmpty(flinkInfo.getSinkType())) {
-            return String.format("^sort-connector-(?i)(%s|%s).*jar$", flinkInfo.getSourceType(),
-                    flinkInfo.getSinkType());
-        } else {
-            return "^sort-connector-.*jar$";
-        }
+    private String getConnectorJarPattern(String dataSourceType) {
+        ConnectorJarType connectorJarType = ConnectorJarType.getInstance(dataSourceType);
+        return connectorJarType == null
+                ? ALL_CONNECTOR_JAR_PATTERN : String.format(CONNECTOR_JAR_PATTERN, connectorJarType.getConnectorType());
+
     }
 
     /**
@@ -135,6 +149,83 @@ public class FlinkOperation {
         }
     }
 
+    /**
+     * Check whether there are duplicate NodeIds in different relations.
+     * <p/>
+     * The JSON data in the dataflow is in the reverse order of the nodes in the actual dataflow.
+     * For example, data flow A -> B -> C, the generated topological relationship is [[B,C],[A,B]],
+     * then the input node B in the first relation [B,C] is the second output node B in relation [A,B].
+     * <p/>
+     * The example of dataflow:
+     * <blockquote><pre>
+     * {
+     *     "groupId": "test_group",
+     *     "streams": [
+     *         {
+     *             "streamId": "test_stream",
+     *             "relations": [
+     *                 {
+     *                     "type": "baseRelation",
+     *                     "inputs": [ "node_3" ],
+     *                     "outputs": [ "node_4" ]
+     *                 },
+     *                 {
+     *                     "type": "innerJoin",
+     *                     "inputs": [ "node_1", "node_2" ],
+     *                     "outputs": [ "node_3"  ]
+     *                 }
+     *             ]
+     *         }
+     *     ]
+     * }
+     * </pre></blockquote>
+     */
+    private void checkNodeIds(String dataflow) throws Exception {
+        JsonNode relations = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
+                .get(0).get(InlongConstants.RELATIONS);
+        List<Pair<List<String>, List<String>>> nodeIdsPairList = new ArrayList<>();
+        for (int i = 0; i < relations.size(); i++) {
+            List<String> inputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.INPUTS),
+                    new TypeReference<List<String>>() {
+                    }).stream().collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(inputIds)) {
+                String message = String.format("input nodeId %s cannot be empty", inputIds);
+                log.error(message);
+                throw new Exception(message);
+            }
+
+            List<String> outputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.OUTPUTS),
+                    new TypeReference<List<String>>() {
+                    }).stream().collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(outputIds)) {
+                String message = String.format("output nodeId %s cannot be empty", outputIds);
+                log.error(message);
+                throw new Exception(message);
+            }
+
+            if (!Collections.disjoint(inputIds, outputIds)) {
+                String message = String.format("input nodeId %s cannot be equal to output nodeId %s",
+                        inputIds, outputIds);
+                log.error(message);
+                throw new Exception(message);
+            }
+            nodeIdsPairList.add(Pair.of(inputIds, outputIds));
+        }
+
+        if (nodeIdsPairList.size() > 1) {
+            List<String> allNodeIds = new ArrayList<>(nodeIdsPairList.get(0).getLeft());
+            allNodeIds.addAll(nodeIdsPairList.get(0).getRight());
+            for (int i = 1; i < relations.size(); i++) {
+                if (!Collections.disjoint(allNodeIds, nodeIdsPairList.get(i).getLeft())) {
+                    String message = String.format("input nodeId %s already exists ", nodeIdsPairList.get(i).getLeft());
+                    log.error(message);
+                    throw new Exception(message);
+                }
+                allNodeIds.addAll(nodeIdsPairList.get(i).getLeft());
+            }
+        }
+    }
+
     /**
      * Build Flink local path.
      */
@@ -161,15 +252,30 @@ public class FlinkOperation {
         flinkInfo.setLocalJarPath(jarPath);
         log.info("get sort jar path success, path: {}", jarPath);
 
+        List<String> nodeTypes = new ArrayList<>();
+        if (StringUtils.isNotEmpty(dataflow)) {
+            checkNodeIds(dataflow);
+            JsonNode nodes = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
+                    .get(0).get(InlongConstants.NODES);
+            List<String> types = OBJECT_MAPPER.convertValue(nodes,
+                    new TypeReference<List<Map<String, Object>>>() {
+                    }).stream().map(s -> s.get(InlongConstants.NODE_TYPE).toString()).collect(Collectors.toList());
+            nodeTypes.addAll(types);
+        }
+
         String connectorDir = getConnectorDir(startPath);
-        List<String> connectorPaths = FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(flinkInfo), -1);
+        Set<String> connectorPaths = nodeTypes.stream().filter(
+                s -> s.endsWith(InlongConstants.LOAD) || s.endsWith(InlongConstants.EXTRACT)).map(
+                s -> FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(s), -1)
+        ).flatMap(Collection::stream).collect(Collectors.toSet());
+
         if (CollectionUtils.isEmpty(connectorPaths)) {
             String message = String.format("no sort connectors found in %s", connectorDir);
             log.error(message);
             throw new RuntimeException(message);
         }
 
-        flinkInfo.setConnectorJarPaths(connectorPaths);
+        flinkInfo.setConnectorJarPaths(new ArrayList<>(connectorPaths));
         log.info("get sort connector paths success, paths: {}", connectorPaths);
 
         if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
new file mode 100644
index 000000000..9f383c610
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink.enums;
+
+import lombok.Getter;
+
+/**
+ * Connectors corresponding to different datasource types in inlong-sort
+ */
+@Getter
+public enum ConnectorJarType {
+
+    /**
+     * extract datasource type
+     */
+    MONGODB_SOURCE("mongoExtract", "mongodb-cdc"),
+
+    MYSQL_SOURCE("mysqlExtract", "mysql-cdc"),
+
+    KAFKA_SOURCE("kafkaExtract", "kafka"),
+
+    ORACLE_SOURCE("oracleExtract", "oracle-cdc"),
+
+    POSTGRES_SOURCE("postgresExtract", "postgres-cdc"),
+
+    SQLSERVER_SOURCE("sqlserverExtract", "sqlserver-cdc"),
+
+    /**
+     * load datasource type
+     */
+    MYSQL_SINK("mysqlLoad", "jdbc"),
+
+    KAFKA_SINK("kafkaLoad", "kafka"),
+
+    ORACLE_SINK("oracleLoad", "jdbc"),
+
+    POSTGRES_SINK("postgresLoad", "jdbc"),
+
+    SQLSERVER_SINK("sqlserverLoad", "jdbc"),
+
+    HBASE_SINK("hbaseLoad", "hbase"),
+
+    TDSQLPOSTGRES_SINK("tdsqlPostgresLoad", "jdbc"),
+
+    GREENPLUM_SINK("greenplumLoad", "jdbc"),
+
+    ELASTICSEARCH_SINK("elasticsearchLoad", "elasticsearch"),
+
+    CLICKHOUSE_SINK("clickHouseLoad", "jdbc"),
+
+    DLCICEBERG_SINK("dlcIcebergLoad", "dlc"),
+
+    HIVE_SINK("hiveLoad", "hive"),
+
+    ICEBERG_SINK("icebergLoad", "iceberg"),
+
+    HDFS_SINK("fileSystemLoad", ""),
+
+    ;
+
+    private String sourceType;
+    private String connectorType;
+
+    ConnectorJarType(String sourceType, String connectorType) {
+        this.connectorType = connectorType;
+        this.sourceType = sourceType;
+    }
+
+    /**
+     * Gets datasource connectorJarType
+     *
+     * @param type dataSourceType
+     * @return ConnectorType
+     */
+    public static ConnectorJarType getInstance(String type) {
+        for (ConnectorJarType value : values()) {
+            if (value.getSourceType().equals(type)) {
+                return value;
+            }
+        }
+        return null;
+    }
+
+}