You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/05 09:35:03 UTC

[incubator-inlong] branch master updated: [INLONG-4022][Sort] Fix sort single pom

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c46f37723 [INLONG-4022][Sort] Fix sort single pom
c46f37723 is described below

commit c46f37723cbc0e5d95b0f699b847cbbce8873917
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Thu May 5 17:34:58 2022 +0800

    [INLONG-4022][Sort] Fix sort single pom
---
 .../inlong/sort/formats/base/TableFormatUtils.java |   1 +
 inlong-sort/sort-single-tenant/pom.xml             |  37 ++----
 .../flink/cdc/mysql/table/MySqlTableSource.java    |  27 ++---
 .../flink/parser/FlinkSqlParserTest.java           | 130 +++++++++++++--------
 pom.xml                                            |  13 +--
 5 files changed, 108 insertions(+), 100 deletions(-)

diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index fb053a8e5..e0b572f59 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -93,6 +93,7 @@ import org.apache.inlong.sort.formats.common.VarCharFormatInfo;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
 import static org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
 import static org.apache.flink.util.Preconditions.checkState;
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 0acd78995..1318a6990 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -19,8 +19,8 @@
 -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.inlong</groupId>
         <artifactId>inlong-sort</artifactId>
@@ -242,11 +242,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>io.debezium</groupId>
-            <artifactId>debezium-connector-mysql</artifactId>
-            <version>1.5.4.Final</version>
-        </dependency>
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-core</artifactId>
@@ -266,31 +261,11 @@
             <version>2.10.1</version>
             <scope>compile</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>2.10.1</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.ververica</groupId>
-            <artifactId>flink-connector-debezium</artifactId>
-            <version>${flink-connector-debezium.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>kafka-log4j-appender</artifactId>
-                    <groupId>org.apache.kafka</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mysql</artifactId>
             <version>${debezium-connector-mysql.version}</version>
         </dependency>
-
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-core</artifactId>
@@ -331,11 +306,13 @@
                             <relocations>
                                 <relocation>
                                     <pattern>org.apache.flink.formats.avro</pattern>
-                                    <shadedPattern>org.apache.inlong.shaded.flink.formats.avro</shadedPattern>
+                                    <shadedPattern>org.apache.inlong.shaded.flink.formats.avro
+                                    </shadedPattern>
                                 </relocation>
                             </relocations>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                             </transformers>
                         </configuration>
                     </execution>
@@ -344,4 +321,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
index 792dc304a..6d3e0ff29 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -18,19 +18,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.time.Duration;
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -49,6 +36,20 @@ import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConv
 import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
 import org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.MySqlSource;
 
+import javax.annotation.Nullable;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
  * description.
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index ece7f5ce3..c238b7450 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.parser;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -33,56 +29,86 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
 import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
 import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Flink sql parser unit test class
  */
 public class FlinkSqlParserTest extends AbstractTestBase {
 
-    private MySqlExtractNode buildMySQLExtractNode() {
+    private MySqlExtractNode buildMySQLExtractNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
-            new FieldInfo("name", new StringFormatInfo()),
-            new FieldInfo("age", new IntFormatInfo()),
-            new FieldInfo("salary", new FloatFormatInfo()),
-            new FieldInfo("ts", new TimestampFormatInfo()));
-        return new MySqlExtractNode("1", "mysql_input", fields,
-            null, null, "id",
-            Collections.singletonList("test"), "localhost", "username", "username",
-            "test_database", null, null,
-            null, null);
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()),
+                new FieldInfo("event_type", new StringFormatInfo()));
+        //if you hope hive load mode of append,please add this config.
+        Map<String, String> map = new HashMap<>();
+        map.put("append-mode", "true");
+        return new MySqlExtractNode(id, "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("work1"), "localhost", "root", "password",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private KafkaExtractNode buildKafkaExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+                new StringConstantParam("5"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+        return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
+                "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
     }
 
-    private KafkaLoadNode buildKafkaNode() {
+    private KafkaLoadNode buildKafkaNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
-            new FieldInfo("name", new StringFormatInfo()),
-            new FieldInfo("age", new IntFormatInfo()),
-            new FieldInfo("salary", new FloatFormatInfo()),
-            new FieldInfo("ts", new TimestampFormatInfo()));
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
         List<FieldRelationShip> relations = Arrays
-            .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
-                    new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
-                    new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
-                    new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
-                    new FieldInfo("ts", new TimestampFormatInfo()))
-            );
-        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
-            "topic", "localhost:9092",
-            new CanalJsonFormat(), null,
-            null, null);
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+        return new KafkaLoadNode(id, "kafka_output", fields, relations, null,
+                "workerJson", "localhost:9092",
+                new JsonFormat(), null,
+                null, null);
     }
 
     private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -91,7 +117,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
         return new NodeRelationShip(inputIds, outputIds);
     }
 
-    private HiveLoadNode buildHiveNode() {
+    private HiveLoadNode buildHiveNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo()),
@@ -107,10 +133,10 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                         new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
-        return new HiveLoadNode("2", "hive_output",
+        return new HiveLoadNode(id, "hive_output",
                 fields, relations, null, 1,
-                null, "myCatalog", "myDB", "myTable",
-                "/opt/hive/conf/", "3.1.2",
+                null, "myCatalog", "default", "work2",
+                "/opt/hive/conf", "3.1.2",
                 null, null);
     }
 
@@ -130,13 +156,25 @@ public class FlinkSqlParserTest extends AbstractTestBase {
         env.setParallelism(1);
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
-        Node inputNode = buildMySQLExtractNode();
-        Node outputNode = buildHiveNode();
-        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
-                        Collections.singletonList(outputNode))));
-        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
-        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        Node mysqlExtractNode = buildMySQLExtractNode("1_1");
+        Node kafkaExtractNode = buildKafkaExtractNode("1_2");
+        Node kafkaNode = buildKafkaNode("2_1");
+        Node hiveNode = buildHiveNode("2_2");
+        //mysql-->hive
+        StreamInfo streamInfoMySqlToHive = new StreamInfo("1L", Arrays.asList(mysqlExtractNode, hiveNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+                        Collections.singletonList(hiveNode))));
+        GroupInfo groupInfoMySqlToHive = new GroupInfo("1", Collections.singletonList(streamInfoMySqlToHive));
+        //mysql-->kafka--kafka-->hive
+        StreamInfo streamInfoMySqlToKafkaToHive1 = new StreamInfo("1L", Arrays.asList(mysqlExtractNode, kafkaNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+                        Collections.singletonList(kafkaNode))));
+        StreamInfo streamInfoMySqlToKafkaToHive2 = new StreamInfo("2L", Arrays.asList(kafkaExtractNode, hiveNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(kafkaExtractNode),
+                        Collections.singletonList(hiveNode))));
+        GroupInfo groupInfoMySqlToKafkaToHive = new GroupInfo("1",
+                Arrays.asList(streamInfoMySqlToKafkaToHive1, streamInfoMySqlToKafkaToHive2));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfoMySqlToKafkaToHive);
         parser.parse();
     }
 
@@ -156,8 +194,8 @@ public class FlinkSqlParserTest extends AbstractTestBase {
         env.setParallelism(1);
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
-        Node inputNode = buildMySQLExtractNode();
-        Node outputNode = buildKafkaNode();
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildKafkaNode("2");
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
                 Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
diff --git a/pom.xml b/pom.xml
index d24341113..c5cf9e101 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,10 +172,6 @@
         <jetty.version>9.4.44.v20210927</jetty.version>
         <opencsv.version>5.4</opencsv.version>
         <javax.servlet.api.version>4.0.1</javax.servlet.api.version>
-
-        <flink-table-planner-blink_2.11.version>1.13.5</flink-table-planner-blink_2.11.version>
-        <flink-connector-mysql-cdc.version>2.0.2</flink-connector-mysql-cdc.version>
-
         <gson.version>2.8.6</gson.version>
         <jackson.version>2.13.2</jackson.version>
         <jackson.databind.version>2.13.2.2</jackson.databind.version>
@@ -245,15 +241,10 @@
                 <artifactId>flume-ng-node</artifactId>
                 <version>${flume.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.ververica</groupId>
-                <artifactId>flink-connector-mysql-cdc</artifactId>
-                <version>${flink-connector-mysql-cdc.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-table-planner-blink_2.11</artifactId>
-                <version>${flink-table-planner-blink_2.11.version}</version>
+                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flume</groupId>