You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/06 03:39:40 UTC
[inlong] branch master updated: [INLONG-4812][Manager] Choose the specified connector instead of all (#4832)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff63d25f [INLONG-4812][Manager] Choose the specified connector instead of all (#4832)
3ff63d25f is described below
commit 3ff63d25f7c3205b3024c5f02eae79f9288dbc31
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Wed Jul 6 11:39:35 2022 +0800
[INLONG-4812][Manager] Choose the specified connector instead of all (#4832)
---
.../manager/common/consts/InlongConstants.java | 17 ++-
.../manager/plugin/flink/FlinkOperation.java | 124 +++++++++++++++++++--
.../plugin/flink/enums/ConnectorJarType.java | 99 ++++++++++++++++
3 files changed, 230 insertions(+), 10 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index f1c336d49..2054de5f0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -55,9 +55,24 @@ public class InlongConstants {
/**
* Sort config
*/
-
public static final String DATA_FLOW = "dataFlow";
+ public static final String STREAMS = "streams";
+
+ public static final String RELATIONS = "relations";
+
+ public static final String INPUTS = "inputs";
+
+ public static final String OUTPUTS = "outputs";
+
+ public static final String NODES = "nodes";
+
+ public static final String NODE_TYPE = "type";
+
+ public static final String LOAD = "Load";
+
+ public static final String EXTRACT = "Extract";
+
public static final String SORT_JOB_ID = "sort.job.id";
public static final String SORT_TYPE = "sort.type";
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index ccefecd73..b2afe90f5 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -17,13 +17,20 @@
package org.apache.inlong.manager.plugin.flink;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.ConnectorJarType;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
@@ -33,10 +40,16 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -46,6 +59,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
@Slf4j
public class FlinkOperation {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String CONFIG_FILE = "application.properties";
private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
@@ -53,6 +67,8 @@ public class FlinkOperation {
private static final String INLONG_MANAGER = "inlong-manager";
private static final String INLONG_SORT = "inlong-sort";
private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
+ private static final String CONNECTOR_JAR_PATTERN = "^sort-connector-(?i)(%s).*jar$";
+ private static final String ALL_CONNECTOR_JAR_PATTERN = "^sort-connector-.*jar$";
private static Properties properties;
private final FlinkService flinkService;
@@ -77,13 +93,11 @@ public class FlinkOperation {
/**
* Get Sort connector jar patterns from the Flink info.
*/
- private static String getConnectorJarPattern(FlinkInfo flinkInfo) {
- if (StringUtils.isNotEmpty(flinkInfo.getSourceType()) && StringUtils.isNotEmpty(flinkInfo.getSinkType())) {
- return String.format("^sort-connector-(?i)(%s|%s).*jar$", flinkInfo.getSourceType(),
- flinkInfo.getSinkType());
- } else {
- return "^sort-connector-.*jar$";
- }
+ private String getConnectorJarPattern(String dataSourceType) {
+ ConnectorJarType connectorJarType = ConnectorJarType.getInstance(dataSourceType);
+ return connectorJarType == null
+ ? ALL_CONNECTOR_JAR_PATTERN : String.format(CONNECTOR_JAR_PATTERN, connectorJarType.getConnectorType());
+
}
/**
@@ -135,6 +149,83 @@ public class FlinkOperation {
}
}
+ /**
+ * Check whether there are duplicate NodeIds in different relations.
+ * <p/>
+ * The JSON data in the dataflow is in the reverse order of the nodes in the actual dataflow.
+ * For example, data flow A -> B -> C, the generated topological relationship is [[B,C],[A,B]],
+ * then the input node B in the first relation [B,C] is the second output node B in relation [A,B].
+ * <p/>
+ * The example of dataflow:
+ * <blockquote><pre>
+ * {
+ * "groupId": "test_group",
+ * "streams": [
+ * {
+ * "streamId": "test_stream",
+ * "relations": [
+ * {
+ * "type": "baseRelation",
+ * "inputs": [ "node_3" ],
+ * "outputs": [ "node_4" ]
+ * },
+ * {
+ * "type": "innerJoin",
+ * "inputs": [ "node_1", "node_2" ],
+ * "outputs": [ "node_3" ]
+ * }
+ * ]
+ * }
+ * ]
+ * }
+ * </pre></blockquote>
+ */
+ private void checkNodeIds(String dataflow) throws Exception {
+ JsonNode relations = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
+ .get(0).get(InlongConstants.RELATIONS);
+ List<Pair<List<String>, List<String>>> nodeIdsPairList = new ArrayList<>();
+ for (int i = 0; i < relations.size(); i++) {
+ List<String> inputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.INPUTS),
+ new TypeReference<List<String>>() {
+ }).stream().collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(inputIds)) {
+ String message = String.format("input nodeId %s cannot be empty", inputIds);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ List<String> outputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.OUTPUTS),
+ new TypeReference<List<String>>() {
+ }).stream().collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(outputIds)) {
+ String message = String.format("output nodeId %s cannot be empty", outputIds);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ if (!Collections.disjoint(inputIds, outputIds)) {
+ String message = String.format("input nodeId %s cannot be equal to output nodeId %s",
+ inputIds, outputIds);
+ log.error(message);
+ throw new Exception(message);
+ }
+ nodeIdsPairList.add(Pair.of(inputIds, outputIds));
+ }
+
+ if (nodeIdsPairList.size() > 1) {
+ List<String> allNodeIds = new ArrayList<>(nodeIdsPairList.get(0).getLeft());
+ allNodeIds.addAll(nodeIdsPairList.get(0).getRight());
+ for (int i = 1; i < relations.size(); i++) {
+ if (!Collections.disjoint(allNodeIds, nodeIdsPairList.get(i).getLeft())) {
+ String message = String.format("input nodeId %s already exists ", nodeIdsPairList.get(i).getLeft());
+ log.error(message);
+ throw new Exception(message);
+ }
+ allNodeIds.addAll(nodeIdsPairList.get(i).getLeft());
+ }
+ }
+ }
+
/**
* Build Flink local path.
*/
@@ -161,15 +252,30 @@ public class FlinkOperation {
flinkInfo.setLocalJarPath(jarPath);
log.info("get sort jar path success, path: {}", jarPath);
+ List<String> nodeTypes = new ArrayList<>();
+ if (StringUtils.isNotEmpty(dataflow)) {
+ checkNodeIds(dataflow);
+ JsonNode nodes = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
+ .get(0).get(InlongConstants.NODES);
+ List<String> types = OBJECT_MAPPER.convertValue(nodes,
+ new TypeReference<List<Map<String, Object>>>() {
+ }).stream().map(s -> s.get(InlongConstants.NODE_TYPE).toString()).collect(Collectors.toList());
+ nodeTypes.addAll(types);
+ }
+
String connectorDir = getConnectorDir(startPath);
- List<String> connectorPaths = FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(flinkInfo), -1);
+ Set<String> connectorPaths = nodeTypes.stream().filter(
+ s -> s.endsWith(InlongConstants.LOAD) || s.endsWith(InlongConstants.EXTRACT)).map(
+ s -> FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(s), -1)
+ ).flatMap(Collection::stream).collect(Collectors.toSet());
+
if (CollectionUtils.isEmpty(connectorPaths)) {
String message = String.format("no sort connectors found in %s", connectorDir);
log.error(message);
throw new RuntimeException(message);
}
- flinkInfo.setConnectorJarPaths(connectorPaths);
+ flinkInfo.setConnectorJarPaths(new ArrayList<>(connectorPaths));
log.info("get sort connector paths success, paths: {}", connectorPaths);
if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
new file mode 100644
index 000000000..9f383c610
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink.enums;
+
+import lombok.Getter;
+
+/**
+ * Connectors corresponding to different datasource types in inlong-sort
+ */
+@Getter
+public enum ConnectorJarType {
+
+ /**
+ * extract datasource type
+ */
+ MONGODB_SOURCE("mongoExtract", "mongodb-cdc"),
+
+ MYSQL_SOURCE("mysqlExtract", "mysql-cdc"),
+
+ KAFKA_SOURCE("kafkaExtract", "kafka"),
+
+ ORACLE_SOURCE("oracleExtract", "oracle-cdc"),
+
+ POSTGRES_SOURCE("postgresExtract", "postgres-cdc"),
+
+ SQLSERVER_SOURCE("sqlserverExtract", "sqlserver-cdc"),
+
+ /**
+ * load datasource type
+ */
+ MYSQL_SINK("mysqlLoad", "jdbc"),
+
+ KAFKA_SINK("kafkaLoad", "kafka"),
+
+ ORACLE_SINK("oracleLoad", "jdbc"),
+
+ POSTGRES_SINK("postgresLoad", "jdbc"),
+
+ SQLSERVER_SINK("sqlserverLoad", "jdbc"),
+
+ HBASE_SINK("hbaseLoad", "hbase"),
+
+ TDSQLPOSTGRES_SINK("tdsqlPostgresLoad", "jdbc"),
+
+ GREENPLUM_SINK("greenplumLoad", "jdbc"),
+
+ ELASTICSEARCH_SINK("elasticsearchLoad", "elasticsearch"),
+
+ CLICKHOUSE_SINK("clickHouseLoad", "jdbc"),
+
+ DLCICEBERG_SINK("dlcIcebergLoad", "dlc"),
+
+ HIVE_SINK("hiveLoad", "hive"),
+
+ ICEBERG_SINK("icebergLoad", "iceberg"),
+
+ HDFS_SINK("fileSystemLoad", ""),
+
+ ;
+
+ private String sourceType;
+ private String connectorType;
+
+ ConnectorJarType(String sourceType, String connectorType) {
+ this.connectorType = connectorType;
+ this.sourceType = sourceType;
+ }
+
+ /**
+ * Gets datasource connectorJarType
+ *
+ * @param type dataSourceType
+ * @return ConnectorType
+ */
+ public static ConnectorJarType getInstance(String type) {
+ for (ConnectorJarType value : values()) {
+ if (value.getSourceType().equals(type)) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+}