You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/31 09:08:05 UTC

[incubator-inlong] branch master updated: [INLONG-4448][Sort] Add Greenplum database data load support #4450

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f25741506 [INLONG-4448][Sort] Add Greenplum database data load support #4450
f25741506 is described below

commit f2574150635a575864cc5545aabd8181b5db55d9
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Tue May 31 17:08:00 2022 +0800

    [INLONG-4448][Sort] Add Greenplum database data load support #4450
---
 .../apache/inlong/sort/protocol/node/LoadNode.java |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   4 +-
 .../sort/protocol/node/load/GreenplumLoadNode.java | 112 +++++++++++++++++++
 .../protocol/transformation/FunctionParam.java     |   2 +
 .../protocol/node/load/GreenplumLoadNodeTest.java  |  50 +++++++++
 .../inlong/sort/jdbc/dialect/GreenplumDialect.java |  34 ++++++
 .../inlong/sort/jdbc/table/JdbcDialects.java       |  17 ++-
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  19 ++--
 .../sort/parser/GreenplumLoadSqlParseTest.java     | 124 +++++++++++++++++++++
 9 files changed, 355 insertions(+), 11 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 283e5e5ac..6192e1c53 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -60,7 +61,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
-        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
 })
 @NoArgsConstructor
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 0c62e0d37..7376d7062 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -75,7 +76,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
         @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
-        @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
+        @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
+        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
 })
 public interface Node {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java
new file mode 100644
index 000000000..0526b014d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNode.java
@@ -0,0 +1,112 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.PostgresConstant;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Greenplum load node can load data into Greenplum
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("greenplumLoad")
+@Data
+@NoArgsConstructor
+public class GreenplumLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * jdbc:postgresql://host:port/database
+     */
+    @JsonProperty("url")
+    private String url;
+    @JsonProperty("username")
+    private String username;
+    @JsonProperty("password")
+    private String password;
+    @JsonProperty("tableName")
+    private String tableName;
+    /**
+     * Please declare primary key for sink table when query contains update/delete record if your version support
+     * upsert. You can change source stream mode is "append" if your version can't support upsert.
+     */
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonCreator
+    public GreenplumLoadNode(
+            @JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("url") String url,
+            @JsonProperty("username") String username,
+            @JsonProperty("password") String password,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("primaryKey") String primaryKey) {
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
+        this.url = Preconditions.checkNotNull(url, "url is null");
+        this.username = Preconditions.checkNotNull(username, "username is null");
+        this.password = Preconditions.checkNotNull(password, "password is null");
+        this.tableName = Preconditions.checkNotNull(tableName, "tableName is null");
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        options.put(PostgresConstant.CONNECTOR, PostgresConstant.JDBC_INLONG);
+        options.put(PostgresConstant.URL, url);
+        options.put("dialect-impl", "org.apache.inlong.sort.jdbc.dialect.GreenplumDialect");
+        options.put(PostgresConstant.USERNAME, username);
+        options.put(PostgresConstant.PASSWORD, password);
+        options.put(PostgresConstant.TABLE_NAME, tableName);
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return String.format("table_%s", super.getId());
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return primaryKey;
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index cca2fb755..b1ef83e54 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,6 +22,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
 import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
@@ -63,6 +64,7 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
 @JsonSubTypes({
         @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
         @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin"),
+        @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
         @JsonSubTypes.Type(value = ConstantParam.class, name = "constant"),
         @JsonSubTypes.Type(value = TimeUnitConstantParam.class, name = "timeUnitConstant"),
         @JsonSubTypes.Type(value = StringConstantParam.class, name = "stringConstant"),
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java
new file mode 100644
index 000000000..38c4b0dd5
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/GreenplumLoadNodeTest.java
@@ -0,0 +1,50 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.Collections;
+
+/**
+ * Test for {@link GreenplumLoadNode}
+ */
+public class GreenplumLoadNodeTest extends SerializeBaseTest<GreenplumLoadNode> {
+
+    /**
+     * Get test object
+     *
+     * @return The test object
+     */
+    @Override
+    public GreenplumLoadNode getTestObject() {
+        return new GreenplumLoadNode("1", "greenplum_output",
+                Collections.singletonList(new FieldInfo("name", new StringFormatInfo())),
+                Collections.singletonList(
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo()))
+                ),
+                null, null, 1, null,
+                "jdbc:postgresql://localhost:5432/inlong", "inlong",
+                "inlong", "student", "name");
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java
new file mode 100644
index 000000000..4e3289264
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/GreenplumDialect.java
@@ -0,0 +1,34 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
+
+import java.util.Optional;
+
+/**
+ * JDBC dialect for Greenplum.
+ */
+public class GreenplumDialect extends PostgresDialect {
+
+    @Override
+    public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        return Optional.empty();
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index e62cf5b2c..9de7f1c0d 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -25,7 +25,9 @@ import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
 import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -35,6 +37,8 @@ public final class JdbcDialects {
 
     private static final List<JdbcDialect> DIALECTS = new ArrayList<>();
 
+    private static final Map<String, JdbcDialect> CUSTOM_DIALECTS = new LinkedHashMap<>();
+
     static {
         DIALECTS.add(new MySQLDialect());
         DIALECTS.add(new TDSQLPostgresDialect());
@@ -54,13 +58,22 @@ public final class JdbcDialects {
         return Optional.empty();
     }
 
+    public static Optional<JdbcDialect> getCustomDialect(String dialectImpl) {
+        JdbcDialect jdbcDialect = CUSTOM_DIALECTS.get(dialectImpl);
+        if (jdbcDialect != null) {
+            return Optional.of(jdbcDialect);
+        }
+        return Optional.empty();
+    }
+
     /**
      * Fetch the JdbcDialect class corresponding to a given database url.
      */
-    public static void register(String dialectImpl) {
+    public static Optional<JdbcDialect> register(String dialectImpl) {
         try {
             JdbcDialect dialect = (JdbcDialect) Class.forName(dialectImpl).newInstance();
-            DIALECTS.add(dialect);
+            CUSTOM_DIALECTS.put(dialectImpl, dialect);
+            return Optional.of(dialect);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
             throw new IllegalArgumentException("Cannot register such dialect impl: " + dialectImpl, e);
         }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 006550ba1..1aa99c21b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -205,11 +205,18 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
 
     private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
         final String url = readableConfig.get(URL);
+        Optional<String> dialectImplOptional = readableConfig.getOptional(DIALECT_IMPL);
+        Optional<JdbcDialect> jdbcDialect;
+        if (dialectImplOptional.isPresent()) {
+            jdbcDialect = JdbcDialects.getCustomDialect(dialectImplOptional.get());
+        } else {
+            jdbcDialect = JdbcDialects.get(url);
+        }
         final JdbcOptions.Builder builder =
                 JdbcOptions.builder()
                         .setDBUrl(url)
                         .setTableName(readableConfig.get(TABLE_NAME))
-                        .setDialect(JdbcDialects.get(url).get())
+                        .setDialect(jdbcDialect.get())
                         .setParallelism(
                                 readableConfig
                                         .getOptional(FactoryUtil.SINK_PARALLELISM)
@@ -305,14 +312,13 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
     }
 
     private void validateConfigOptions(ReadableConfig config) {
-        // register custom dialect first
-        config.getOptional(DIALECT_IMPL).ifPresent(JdbcDialects::register);
+        // Register custom dialect first
+        Optional<String> dialectImplOptional = config.getOptional(DIALECT_IMPL);
         String jdbcUrl = config.get(URL);
-        final Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
+        final Optional<JdbcDialect> dialect = dialectImplOptional.map(JdbcDialects::register)
+                .orElseGet(() -> JdbcDialects.get(jdbcUrl));
         checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + jdbcUrl);
-
         checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
-
         checkAllOrNone(
                 config,
                 new ConfigOption[]{
@@ -321,7 +327,6 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                         SCAN_PARTITION_LOWER_BOUND,
                         SCAN_PARTITION_UPPER_BOUND
                 });
-
         if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
                 && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
             long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
new file mode 100644
index 000000000..c1100c6c3
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
@@ -0,0 +1,124 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link GreenplumLoadNode}
+ */
+public class GreenplumLoadSqlParseTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("student"), "localhost", "inlong",
+                "inlong", "inlong", null, null,
+                null, null);
+    }
+
+    private Node buildGreenplumLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo())
+        );
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        return new GreenplumLoadNode("2", "greenplum_output", fields, relations, null,
+                null, null, null, "jdbc:postgresql://localhost:5432/inlong",
+                "inlong", "inlong", "student", "id");
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link GreenplumLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testGreenplumLoadSqlParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode();
+        Node outputNode = buildGreenplumLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+}