You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/31 09:08:05 UTC
[incubator-inlong] branch master updated: [INLONG-4448][Sort] Add Greenplum database data load support #4450
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f25741506 [INLONG-4448][Sort] Add Greenplum database data load support #4450
f25741506 is described below
commit f2574150635a575864cc5545aabd8181b5db55d9
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Tue May 31 17:08:00 2022 +0800
[INLONG-4448][Sort] Add Greenplum database data load support #4450
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
.../sort/protocol/node/load/GreenplumLoadNode.java | 112 +++++++++++++++++++
.../protocol/transformation/FunctionParam.java | 2 +
.../protocol/node/load/GreenplumLoadNodeTest.java | 50 +++++++++
.../inlong/sort/jdbc/dialect/GreenplumDialect.java | 34 ++++++
.../inlong/sort/jdbc/table/JdbcDialects.java | 17 ++-
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 19 ++--
.../sort/parser/GreenplumLoadSqlParseTest.java | 124 +++++++++++++++++++++
9 files changed, 355 insertions(+), 11 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 283e5e5ac..6192e1c53 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -60,7 +61,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
- @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+ @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+ @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
})
@NoArgsConstructor
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 0c62e0d37..7376d7062 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -75,7 +76,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
@JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
- @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
+ @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
+ @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java
new file mode 100644
index 000000000..0526b014d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.PostgresConstant;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Greenplum load node can load data into Greenplum
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("greenplumLoad")
+@Data
+@NoArgsConstructor
+public class GreenplumLoadNode extends LoadNode implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * jdbc:postgresql://host:port/database
+ */
+ @JsonProperty("url")
+ private String url;
+ @JsonProperty("username")
+ private String username;
+ @JsonProperty("password")
+ private String password;
+ @JsonProperty("tableName")
+ private String tableName;
+ /**
+ * Please declare primary key for sink table when query contains update/delete record if your version support
+ * upsert. You can change source stream mode is "append" if your version can't support upsert.
+ */
+ @JsonProperty("primaryKey")
+ private String primaryKey;
+
+ @JsonCreator
+ public GreenplumLoadNode(
+ @JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("url") String url,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("primaryKey") String primaryKey) {
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
+ this.url = Preconditions.checkNotNull(url, "url is null");
+ this.username = Preconditions.checkNotNull(username, "username is null");
+ this.password = Preconditions.checkNotNull(password, "password is null");
+ this.tableName = Preconditions.checkNotNull(tableName, "tableName is null");
+ this.primaryKey = primaryKey;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+ options.put(PostgresConstant.CONNECTOR, PostgresConstant.JDBC_INLONG);
+ options.put(PostgresConstant.URL, url);
+ options.put("dialect-impl", "org.apache.inlong.sort.jdbc.dialect.GreenplumDialect");
+ options.put(PostgresConstant.USERNAME, username);
+ options.put(PostgresConstant.PASSWORD, password);
+ options.put(PostgresConstant.TABLE_NAME, tableName);
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return String.format("table_%s", super.getId());
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index cca2fb755..b1ef83e54 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,6 +22,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
@@ -63,6 +64,7 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
@JsonSubTypes({
@JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
@JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin"),
+ @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
@JsonSubTypes.Type(value = ConstantParam.class, name = "constant"),
@JsonSubTypes.Type(value = TimeUnitConstantParam.class, name = "timeUnitConstant"),
@JsonSubTypes.Type(value = StringConstantParam.class, name = "stringConstant"),
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java
new file mode 100644
index 000000000..38c4b0dd5
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.Collections;
+
+/**
+ * Test for {@link GreenplumLoadNode}
+ */
+public class GreenplumLoadNodeTest extends SerializeBaseTest<GreenplumLoadNode> {
+
+ /**
+ * Get test object
+ *
+ * @return The test object
+ */
+ @Override
+ public GreenplumLoadNode getTestObject() {
+ return new GreenplumLoadNode("1", "greenplum_output",
+ Collections.singletonList(new FieldInfo("name", new StringFormatInfo())),
+ Collections.singletonList(
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()))
+ ),
+ null, null, 1, null,
+ "jdbc:postgresql://localhost:5432/inlong", "inlong",
+ "inlong", "student", "name");
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java
new file mode 100644
index 000000000..4e3289264
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
+
+import java.util.Optional;
+
+/**
+ * JDBC dialect for Greenplum.
+ */
+public class GreenplumDialect extends PostgresDialect {
+
+ @Override
+ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index e62cf5b2c..9de7f1c0d 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -25,7 +25,9 @@ import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -35,6 +37,8 @@ public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS = new ArrayList<>();
+ private static final Map<String, JdbcDialect> CUSTOM_DIALECTS = new LinkedHashMap<>();
+
static {
DIALECTS.add(new MySQLDialect());
DIALECTS.add(new TDSQLPostgresDialect());
@@ -54,13 +58,22 @@ public final class JdbcDialects {
return Optional.empty();
}
+ public static Optional<JdbcDialect> getCustomDialect(String dialectImpl) {
+ JdbcDialect jdbcDialect = CUSTOM_DIALECTS.get(dialectImpl);
+ if (jdbcDialect != null) {
+ return Optional.of(jdbcDialect);
+ }
+ return Optional.empty();
+ }
+
/**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
- public static void register(String dialectImpl) {
+ public static Optional<JdbcDialect> register(String dialectImpl) {
try {
JdbcDialect dialect = (JdbcDialect) Class.forName(dialectImpl).newInstance();
- DIALECTS.add(dialect);
+ CUSTOM_DIALECTS.put(dialectImpl, dialect);
+ return Optional.of(dialect);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("Cannot register such dialect impl: " + dialectImpl, e);
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 006550ba1..1aa99c21b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -205,11 +205,18 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
+ Optional<String> dialectImplOptional = readableConfig.getOptional(DIALECT_IMPL);
+ Optional<JdbcDialect> jdbcDialect;
+ if (dialectImplOptional.isPresent()) {
+ jdbcDialect = JdbcDialects.getCustomDialect(dialectImplOptional.get());
+ } else {
+ jdbcDialect = JdbcDialects.get(url);
+ }
final JdbcOptions.Builder builder =
JdbcOptions.builder()
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
- .setDialect(JdbcDialects.get(url).get())
+ .setDialect(jdbcDialect.get())
.setParallelism(
readableConfig
.getOptional(FactoryUtil.SINK_PARALLELISM)
@@ -305,14 +312,13 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
}
private void validateConfigOptions(ReadableConfig config) {
- // register custom dialect first
- config.getOptional(DIALECT_IMPL).ifPresent(JdbcDialects::register);
+ // Register custom dialect first
+ Optional<String> dialectImplOptional = config.getOptional(DIALECT_IMPL);
String jdbcUrl = config.get(URL);
- final Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
+ final Optional<JdbcDialect> dialect = dialectImplOptional.map(JdbcDialects::register)
+ .orElseGet(() -> JdbcDialects.get(jdbcUrl));
checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + jdbcUrl);
-
checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
-
checkAllOrNone(
config,
new ConfigOption[]{
@@ -321,7 +327,6 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
SCAN_PARTITION_LOWER_BOUND,
SCAN_PARTITION_UPPER_BOUND
});
-
if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
&& config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
new file mode 100644
index 000000000..c1100c6c3
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link GreenplumLoadNode}
+ */
+public class GreenplumLoadSqlParseTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("student"), "localhost", "inlong",
+ "inlong", "inlong", null, null,
+ null, null);
+ }
+
+ private Node buildGreenplumLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())
+ );
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()))
+ );
+ return new GreenplumLoadNode("2", "greenplum_output", fields, relations, null,
+ null, null, null, "jdbc:postgresql://localhost:5432/inlong",
+ "inlong", "inlong", "student", "id");
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link GreenplumLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testGreenplumLoadSqlParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildGreenplumLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+}