You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/05 09:35:03 UTC
[incubator-inlong] branch master updated: [INLONG-4022][Sort] Fix sort single pom
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c46f37723 [INLONG-4022][Sort] Fix sort single pom
c46f37723 is described below
commit c46f37723cbc0e5d95b0f699b847cbbce8873917
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Thu May 5 17:34:58 2022 +0800
[INLONG-4022][Sort] Fix sort single pom
---
.../inlong/sort/formats/base/TableFormatUtils.java | 1 +
inlong-sort/sort-single-tenant/pom.xml | 37 ++----
.../flink/cdc/mysql/table/MySqlTableSource.java | 27 ++---
.../flink/parser/FlinkSqlParserTest.java | 130 +++++++++++++--------
pom.xml | 13 +--
5 files changed, 108 insertions(+), 100 deletions(-)
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index fb053a8e5..e0b572f59 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -93,6 +93,7 @@ import org.apache.inlong.sort.formats.common.VarCharFormatInfo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
import static org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
import static org.apache.flink.util.Preconditions.checkState;
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 0acd78995..1318a6990 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -19,8 +19,8 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-sort</artifactId>
@@ -242,11 +242,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-connector-mysql</artifactId>
- <version>1.5.4.Final</version>
- </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
@@ -266,31 +261,11 @@
<version>2.10.1</version>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>2.10.1</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-debezium</artifactId>
- <version>${flink-connector-debezium.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>kafka-log4j-appender</artifactId>
- <groupId>org.apache.kafka</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium-connector-mysql.version}</version>
</dependency>
-
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
@@ -331,11 +306,13 @@
<relocations>
<relocation>
<pattern>org.apache.flink.formats.avro</pattern>
- <shadedPattern>org.apache.inlong.shaded.flink.formats.avro</shadedPattern>
+ <shadedPattern>org.apache.inlong.shaded.flink.formats.avro
+ </shadedPattern>
</relocation>
</relocations>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
@@ -344,4 +321,4 @@
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
index 792dc304a..6d3e0ff29 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -18,19 +18,6 @@
package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.time.Duration;
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -49,6 +36,20 @@ import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConv
import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.MySqlSource;
+import javax.annotation.Nullable;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
* description.
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index ece7f5ce3..c238b7450 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -17,10 +17,6 @@
package org.apache.inlong.sort.singletenant.flink.parser;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -33,56 +29,86 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Flink sql parser unit test class
*/
public class FlinkSqlParserTest extends AbstractTestBase {
- private MySqlExtractNode buildMySQLExtractNode() {
+ private MySqlExtractNode buildMySQLExtractNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("salary", new FloatFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()));
- return new MySqlExtractNode("1", "mysql_input", fields,
- null, null, "id",
- Collections.singletonList("test"), "localhost", "username", "username",
- "test_database", null, null,
- null, null);
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("event_type", new StringFormatInfo()));
+ //if you hope hive load mode of append,please add this config.
+ Map<String, String> map = new HashMap<>();
+ map.put("append-mode", "true");
+ return new MySqlExtractNode(id, "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("work1"), "localhost", "root", "password",
+ "inlong", null, null,
+ null, null);
+ }
+
+ private KafkaExtractNode buildKafkaExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+ new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
+ "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
}
- private KafkaLoadNode buildKafkaNode() {
+ private KafkaLoadNode buildKafkaNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("salary", new FloatFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()));
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
- new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
- new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
- new FieldInfo("ts", new TimestampFormatInfo()))
- );
- return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
- "topic", "localhost:9092",
- new CanalJsonFormat(), null,
- null, null);
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()))
+ );
+ return new KafkaLoadNode(id, "kafka_output", fields, relations, null,
+ "workerJson", "localhost:9092",
+ new JsonFormat(), null,
+ null, null);
}
private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -91,7 +117,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
return new NodeRelationShip(inputIds, outputIds);
}
- private HiveLoadNode buildHiveNode() {
+ private HiveLoadNode buildHiveNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()),
@@ -107,10 +133,10 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
- return new HiveLoadNode("2", "hive_output",
+ return new HiveLoadNode(id, "hive_output",
fields, relations, null, 1,
- null, "myCatalog", "myDB", "myTable",
- "/opt/hive/conf/", "3.1.2",
+ null, "myCatalog", "default", "work2",
+ "/opt/hive/conf", "3.1.2",
null, null);
}
@@ -130,13 +156,25 @@ public class FlinkSqlParserTest extends AbstractTestBase {
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
- Node inputNode = buildMySQLExtractNode();
- Node outputNode = buildHiveNode();
- StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
- Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
- Collections.singletonList(outputNode))));
- GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
- FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ Node mysqlExtractNode = buildMySQLExtractNode("1_1");
+ Node kafkaExtractNode = buildKafkaExtractNode("1_2");
+ Node kafkaNode = buildKafkaNode("2_1");
+ Node hiveNode = buildHiveNode("2_2");
+ //mysql-->hive
+ StreamInfo streamInfoMySqlToHive = new StreamInfo("1L", Arrays.asList(mysqlExtractNode, hiveNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+ Collections.singletonList(hiveNode))));
+ GroupInfo groupInfoMySqlToHive = new GroupInfo("1", Collections.singletonList(streamInfoMySqlToHive));
+ //mysql-->kafka--kafka-->hive
+ StreamInfo streamInfoMySqlToKafkaToHive1 = new StreamInfo("1L", Arrays.asList(mysqlExtractNode, kafkaNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+ Collections.singletonList(kafkaNode))));
+ StreamInfo streamInfoMySqlToKafkaToHive2 = new StreamInfo("2L", Arrays.asList(kafkaExtractNode, hiveNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(kafkaExtractNode),
+ Collections.singletonList(hiveNode))));
+ GroupInfo groupInfoMySqlToKafkaToHive = new GroupInfo("1",
+ Arrays.asList(streamInfoMySqlToKafkaToHive1, streamInfoMySqlToKafkaToHive2));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfoMySqlToKafkaToHive);
parser.parse();
}
@@ -156,8 +194,8 @@ public class FlinkSqlParserTest extends AbstractTestBase {
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
- Node inputNode = buildMySQLExtractNode();
- Node outputNode = buildKafkaNode();
+ Node inputNode = buildMySQLExtractNode("1");
+ Node outputNode = buildKafkaNode("2");
StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
Collections.singletonList(outputNode))));
diff --git a/pom.xml b/pom.xml
index d24341113..c5cf9e101 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,10 +172,6 @@
<jetty.version>9.4.44.v20210927</jetty.version>
<opencsv.version>5.4</opencsv.version>
<javax.servlet.api.version>4.0.1</javax.servlet.api.version>
-
- <flink-table-planner-blink_2.11.version>1.13.5</flink-table-planner-blink_2.11.version>
- <flink-connector-mysql-cdc.version>2.0.2</flink-connector-mysql-cdc.version>
-
<gson.version>2.8.6</gson.version>
<jackson.version>2.13.2</jackson.version>
<jackson.databind.version>2.13.2.2</jackson.databind.version>
@@ -245,15 +241,10 @@
<artifactId>flume-ng-node</artifactId>
<version>${flume.version}</version>
</dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${flink-connector-mysql-cdc.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.11</artifactId>
- <version>${flink-table-planner-blink_2.11.version}</version>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>