You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/30 05:00:58 UTC

[incubator-inlong] branch master updated: [INLONG-4394][Sort] Add Oracle data load support (#4413)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 323e695e4 [INLONG-4394][Sort] Add Oracle data load support (#4413)
323e695e4 is described below

commit 323e695e4de8e21001d404d3073a948a27f280f9
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon May 30 13:00:53 2022 +0800

    [INLONG-4394][Sort] Add Oracle data load support (#4413)
---
 .../apache/inlong/sort/protocol/node/LoadNode.java |   6 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   4 +-
 .../sort/protocol/node/load/OracleLoadNode.java    | 102 ++++++++
 .../protocol/node/load/OracleLoadNodeTest.java     |  46 ++++
 inlong-sort/sort-connectors/jdbc/pom.xml           |  12 +
 .../jdbc/converter/AbstractJdbcRowConverter.java   | 283 +++++++++++++++++++++
 .../clickhouse/ClickHouseRowConverter.java         |   7 +-
 .../jdbc/converter/oracle/OracleRowConverter.java  | 181 +++++++++++++
 .../sqlserver/SqlServerRowConvert.java             |   6 +-
 .../sort/jdbc/dialect/ClickHouseDialect.java       |   2 +-
 .../inlong/sort/jdbc/dialect/OracleDialect.java    | 175 +++++++++++++
 .../inlong/sort/jdbc/dialect/SqlServerDialect.java |   2 +-
 .../inlong/sort/jdbc/table/JdbcDialects.java       |   2 +
 .../inlong/sort/parser/OracleLoadSqlParseTest.java | 124 +++++++++
 pom.xml                                            |  23 ++
 15 files changed, 963 insertions(+), 12 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 98d3324b8..6fbfc24a8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -60,7 +61,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
-        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+        @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
 })
 @NoArgsConstructor
 @Data
@@ -98,7 +100,7 @@ public abstract class LoadNode implements Node {
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
-            @JsonProperty("properties") Map<String, String> properties) {
+            @Nullable @JsonProperty("properties") Map<String, String> properties) {
         this.id = Preconditions.checkNotNull(id, "id is null");
         this.name = name;
         this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 470901782..0c62e0d37 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -73,7 +74,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
-        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+        @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
 })
 public interface Node {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
new file mode 100644
index 000000000..c725c07e1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Oracle load node can load data into Oracle
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("oracleLoad")
+@Data
+@NoArgsConstructor
+public class OracleLoadNode extends LoadNode implements Serializable {
+    /**
+     * jdbc:oracle://host:port/database
+     */
+    @JsonProperty("url")
+    private String url;
+    @JsonProperty("username")
+    private String username;
+    @JsonProperty("password")
+    private String password;
+    @JsonProperty("tableName")
+    private String tableName;
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonCreator
+    public OracleLoadNode(
+            @JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("url") String url,
+            @JsonProperty("username") String username,
+            @JsonProperty("password") String password,
+            @JsonProperty("tableName") String tableName,
+            @JsonProperty("primaryKey") String primaryKey) {
+        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        this.url = Preconditions.checkNotNull(url, "url is null");
+        this.username = Preconditions.checkNotNull(username, "username is null");
+        this.password = Preconditions.checkNotNull(password, "password is null");
+        this.tableName = Preconditions.checkNotNull(tableName, "tableName is null");
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        options.put("connector", "jdbc-inlong");
+        options.put("url", url);
+        options.put("username", username);
+        options.put("password", password);
+        options.put("table-name", tableName);
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return String.format("table_%s", super.getId());
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return primaryKey;
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
new file mode 100644
index 000000000..ca972fe23
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
@@ -0,0 +1,46 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Collections;
+
+/**
+ * Test for {@link OracleLoadNode}
+ */
+public class OracleLoadNodeTest extends SerializeBaseTest<OracleLoadNode> {
+
+    @Override
+    public OracleLoadNode getTestObject() {
+        return new OracleLoadNode("1", "mysql_output",
+                Collections.singletonList(new FieldInfo("NAME", new StringFormatInfo())),
+                Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+                        new StringFormatInfo()), new FieldInfo("NAME", new StringFormatInfo()))),
+                null, null, 1, null,
+                "jdbc:oracle:thin:@localhost:1521:xe",
+                "inlong",
+                "inlong",
+                "user",
+                "name");
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-connectors/jdbc/pom.xml
index a4fad240b..86b3610c7 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -43,6 +43,16 @@
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
         </dependency>
+        <!--for oracle-->
+        <dependency>
+            <groupId>com.oracle.database.jdbc</groupId>
+            <artifactId>ojdbc8</artifactId>
+        </dependency>
+        <!--for mysql-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
         <!--for jdbc-->
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -74,6 +84,8 @@
                                     <include>org.apache.flink:flink-connector-jdbc_${flink.scala.binary.version}</include>
                                     <include>org.postgresql:postgresql</include>
                                     <include>ru.yandex.clickhouse:clickhouse-jdbc</include>
+                                    <include>mysql:mysql-connector-java</include>
+                                    <include>com.oracle.database.jdbc:*</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
new file mode 100644
index 000000000..9a800b163
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
@@ -0,0 +1,283 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all converters that convert between JDBC object and Flink internal object.
+ * Copy of ${@link org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter}
+ * to override the method createInternalConverter and createExternalConverter in subclasses
+ * by make JdbcDeserializationConverter,JdbcSerializationConverter public.
+ */
+public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+
+    protected final RowType rowType;
+    protected final JdbcDeserializationConverter[] toInternalConverters;
+    protected final JdbcSerializationConverter[] toExternalConverters;
+    protected final LogicalType[] fieldTypes;
+
+    public AbstractJdbcRowConverter(RowType rowType) {
+        this.rowType = checkNotNull(rowType);
+        this.fieldTypes =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        this.toInternalConverters = new JdbcDeserializationConverter[rowType.getFieldCount()];
+        this.toExternalConverters = new JdbcSerializationConverter[rowType.getFieldCount()];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            toInternalConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
+            toExternalConverters[i] = createNullableExternalConverter(fieldTypes[i]);
+        }
+    }
+
+    public abstract String converterName();
+
+    @Override
+    public RowData toInternal(ResultSet resultSet) throws SQLException {
+        GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
+        for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
+            Object field = resultSet.getObject(pos + 1);
+            genericRowData.setField(pos, toInternalConverters[pos].deserialize(field));
+        }
+        return genericRowData;
+    }
+
+    @Override
+    public FieldNamedPreparedStatement toExternal(
+            RowData rowData, FieldNamedPreparedStatement statement) throws SQLException {
+        for (int index = 0; index < rowData.getArity(); index++) {
+            toExternalConverters[index].serialize(rowData, index, statement);
+        }
+        return statement;
+    }
+
+    /**
+     * Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link
+     * LogicalType}.
+     */
+    protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
+        return wrapIntoNullableInternalConverter(createInternalConverter(type));
+    }
+
+    protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
+            JdbcDeserializationConverter jdbcDeserializationConverter) {
+        return val -> {
+            if (val == null) {
+                return null;
+            } else {
+                return jdbcDeserializationConverter.deserialize(val);
+            }
+        };
+    }
+
+    protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return val -> null;
+            case BOOLEAN:
+            case FLOAT:
+            case DOUBLE:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return val -> val;
+            case TINYINT:
+                return val -> ((Integer) val).byteValue();
+            case SMALLINT:
+                // Converter for small type that casts value to int and then return short value,
+                // since
+                // JDBC 1.0 use int type for small values.
+                return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
+            case INTEGER:
+                return val -> val;
+            case BIGINT:
+                return val -> val;
+            case DECIMAL:
+                final int precision = ((DecimalType) type).getPrecision();
+                final int scale = ((DecimalType) type).getScale();
+                // using decimal(20, 0) to support db type bigint unsigned, user should define
+                // decimal(20, 0) in SQL,
+                // but other precision like decimal(30, 0) can work too from lenient consideration.
+                return val ->
+                        val instanceof BigInteger
+                                ? DecimalData.fromBigDecimal(
+                                new BigDecimal((BigInteger) val, 0), precision, scale)
+                                : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+            case DATE:
+                return val -> (int) (((Date) val).toLocalDate().toEpochDay());
+            case TIME_WITHOUT_TIME_ZONE:
+                return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return val ->
+                        val instanceof LocalDateTime
+                                ? TimestampData.fromLocalDateTime((LocalDateTime) val)
+                                : TimestampData.fromTimestamp((Timestamp) val);
+            case CHAR:
+            case VARCHAR:
+                return val -> StringData.fromString((String) val);
+            case BINARY:
+            case VARBINARY:
+                return val -> (byte[]) val;
+            case ARRAY:
+            case ROW:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type:" + type);
+        }
+    }
+
+    /**
+     * Create a nullable JDBC f{@link JdbcSerializationConverter} from given sql type.
+     */
+    protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+        return wrapIntoNullableExternalConverter(createExternalConverter(type), type);
+    }
+
+    protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
+            JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
+        final int sqlType =
+                JdbcTypeUtil.typeInformationToSqlType(
+                        TypeConversions.fromDataTypeToLegacyInfo(
+                                TypeConversions.fromLogicalToDataType(type)));
+        return (val, index, statement) -> {
+            if (val == null
+                    || val.isNullAt(index)
+                    || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+                statement.setNull(index, sqlType);
+            } else {
+                jdbcSerializationConverter.serialize(val, index, statement);
+            }
+        };
+    }
+
+    protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return (val, index, statement) ->
+                        statement.setBoolean(index, val.getBoolean(index));
+            case TINYINT:
+                return (val, index, statement) -> statement.setByte(index, val.getByte(index));
+            case SMALLINT:
+                return (val, index, statement) -> statement.setShort(index, val.getShort(index));
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return (val, index, statement) -> statement.setInt(index, val.getInt(index));
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return (val, index, statement) -> statement.setLong(index, val.getLong(index));
+            case FLOAT:
+                return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
+            case DOUBLE:
+                return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
+            case CHAR:
+            case VARCHAR:
+                // value is BinaryString
+                return (val, index, statement) ->
+                        statement.setString(index, val.getString(index).toString());
+            case BINARY:
+            case VARBINARY:
+                return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
+            case DATE:
+                return (val, index, statement) ->
+                        statement.setDate(
+                                index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
+            case TIME_WITHOUT_TIME_ZONE:
+                return (val, index, statement) ->
+                        statement.setTime(
+                                index,
+                                Time.valueOf(
+                                        LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final int timestampPrecision = ((TimestampType) type).getPrecision();
+                return (val, index, statement) ->
+                        statement.setTimestamp(
+                                index, val.getTimestamp(index, timestampPrecision).toTimestamp());
+            case DECIMAL:
+                final int decimalPrecision = ((DecimalType) type).getPrecision();
+                final int decimalScale = ((DecimalType) type).getScale();
+                return (val, index, statement) ->
+                        statement.setBigDecimal(
+                                index,
+                                val.getDecimal(index, decimalPrecision, decimalScale)
+                                        .toBigDecimal());
+            case ARRAY:
+            case MAP:
+            case MULTISET:
+            case ROW:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type:" + type);
+        }
+    }
+
+    /**
+     * Runtime converter to convert JDBC field to {@link RowData} type object.
+     */
+    @FunctionalInterface
+    public interface JdbcDeserializationConverter extends Serializable {
+        /**
+         * Convert a jdbc field object of {@link ResultSet} to the internal data structure object.
+         *
+         * @param jdbcField
+         */
+        Object deserialize(Object jdbcField) throws SQLException;
+    }
+
+    /**
+     * Runtime converter to convert {@link RowData} field to java object and fill into the {@link
+     * PreparedStatement}.
+     */
+    @FunctionalInterface
+    public interface JdbcSerializationConverter extends Serializable {
+        void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement)
+                throws SQLException;
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
similarity index 87%
rename from inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java
rename to inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
index 198f22e77..4046c7b6a 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
@@ -16,14 +16,13 @@
  *  limitations under the License.
  */
 
-package org.apache.inlong.sort.jdbc.clickhouse;
+package org.apache.inlong.sort.jdbc.converter.clickhouse;
 
-import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
 
 /**
- * Runtime converter that responsible to convert between JDBC object and Flink internal object for
- * Derby.
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for ClickHouse.
  */
 public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java
new file mode 100644
index 000000000..595b57456
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java
@@ -0,0 +1,181 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter.oracle;
+
+import oracle.jdbc.internal.OracleBlob;
+import oracle.jdbc.internal.OracleClob;
+import oracle.sql.BINARY_DOUBLE;
+import oracle.sql.BINARY_FLOAT;
+import oracle.sql.CHAR;
+import oracle.sql.DATE;
+import oracle.sql.NUMBER;
+import oracle.sql.RAW;
+import oracle.sql.TIMESTAMP;
+import oracle.sql.TIMESTAMPTZ;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for Oracle.
+ */
+public class OracleRowConverter extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    public OracleRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    @Override
+    public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return val -> null;
+            case BOOLEAN:
+                return val -> val instanceof NUMBER ? ((NUMBER) val).booleanValue() : val;
+            case FLOAT:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).floatValue()
+                                : val instanceof BINARY_FLOAT
+                                ? ((BINARY_FLOAT) val).floatValue()
+                                : val instanceof BigDecimal
+                                ? ((BigDecimal) val).floatValue()
+                                : val;
+            case DOUBLE:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).doubleValue()
+                                : val instanceof BINARY_DOUBLE
+                                ? ((BINARY_DOUBLE) val).doubleValue()
+                                : val instanceof BigDecimal
+                                ? ((BigDecimal) val).doubleValue()
+                                : val;
+            case TINYINT:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).byteValue()
+                                : val instanceof BigDecimal ? ((BigDecimal) val).byteValue() : val;
+            case SMALLINT:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).shortValue()
+                                : val instanceof BigDecimal ? ((BigDecimal) val).shortValue() : val;
+            case INTEGER:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).intValue()
+                                : val instanceof BigDecimal ? ((BigDecimal) val).intValue() : val;
+            case BIGINT:
+                return val ->
+                        val instanceof NUMBER
+                                ? ((NUMBER) val).longValue()
+                                : val instanceof BigDecimal ? ((BigDecimal) val).longValue() : val;
+            case DECIMAL:
+                final int precision = ((DecimalType) type).getPrecision();
+                final int scale = ((DecimalType) type).getScale();
+                return val ->
+                        val instanceof BigInteger
+                                ? DecimalData.fromBigDecimal(
+                                new BigDecimal((BigInteger) val, 0), precision, scale)
+                                : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+            case CHAR:
+            case VARCHAR:
+                return val ->
+                        (val instanceof CHAR)
+                                ? StringData.fromString(((CHAR) val).getString())
+                                : (val instanceof OracleClob)
+                                ? StringData.fromString(((OracleClob) val).stringValue())
+                                : StringData.fromString((String) val);
+            case BINARY:
+            case VARBINARY:
+            case RAW:
+                return val ->
+                        val instanceof RAW
+                                ? ((RAW) val).getBytes()
+                                : val instanceof OracleBlob
+                                ? ((OracleBlob) val)
+                                .getBytes(1, (int) ((OracleBlob) val).length())
+                                : val.toString().getBytes();
+
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val;
+            case DATE:
+                return val ->
+                        val instanceof DATE
+                                ? (int) (((DATE) val).dateValue().toLocalDate().toEpochDay())
+                                : val instanceof Timestamp
+                                ? (int)
+                                (((Timestamp) val)
+                                        .toLocalDateTime()
+                                        .toLocalDate()
+                                        .toEpochDay())
+                                : (int) (((Date) val).toLocalDate().toEpochDay());
+            case TIME_WITHOUT_TIME_ZONE:
+                return val ->
+                        val instanceof DATE
+                                ? (int)
+                                (((DATE) val).timeValue().toLocalTime().toNanoOfDay()
+                                        / 1_000_000L)
+                                : (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return val ->
+                        val instanceof TIMESTAMP
+                                ? TimestampData.fromTimestamp(((TIMESTAMP) val).timestampValue())
+                                : TimestampData.fromTimestamp((Timestamp) val);
+            case TIMESTAMP_WITH_TIME_ZONE:
+                return val -> {
+                    if (val instanceof TIMESTAMPTZ) {
+                        final TIMESTAMPTZ ts = (TIMESTAMPTZ) val;
+                        final ZonedDateTime zdt =
+                                ZonedDateTime.ofInstant(
+                                        ts.timestampValue().toInstant(),
+                                        ts.getTimeZone().toZoneId());
+                        return TimestampData.fromLocalDateTime(zdt.toLocalDateTime());
+                    } else {
+                        return TimestampData.fromTimestamp((Timestamp) val);
+                    }
+                };
+            case ARRAY:
+            case ROW:
+            case MAP:
+            case MULTISET:
+            default:
+                return super.createInternalConverter(type);
+        }
+    }
+
+    @Override
+    public String converterName() {
+        return "Oracle";
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
similarity index 89%
rename from inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java
rename to inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
index a355d6b4e..3900fe6c9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
@@ -16,17 +16,17 @@
  *
  */
 
-package org.apache.inlong.sort.jdbc.sqlserver;
+package org.apache.inlong.sort.jdbc.converter.sqlserver;
 
-import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
 
 /**
  * Runtime converter that responsible to convert between JDBC object and Flink internal object for
  * SqlServer.
  */
 public class SqlServerRowConvert extends AbstractJdbcRowConverter {
-    
+
     public SqlServerRowConvert(RowType rowType) {
         super(rowType);
     }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
index 781f0b078..9216f6485 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.jdbc.dialect;
 import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.jdbc.clickhouse.ClickHouseRowConverter;
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
 import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
 
 import java.util.Arrays;
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java
new file mode 100644
index 000000000..b158bf1c4
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java
@@ -0,0 +1,175 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.oracle.OracleRowConverter;
+import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * JDBC dialect for Oracle.
+ */
+public class OracleDialect extends AbstractJdbcDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    // Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
+    // https://www.techonthenet.com/oracle/datatypes.php
+    private static final int MAX_TIMESTAMP_PRECISION = 9;
+    private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+    // Define MAX/MIN precision of DECIMAL type according to Oracle docs:
+    // https://www.techonthenet.com/oracle/datatypes.php
+    private static final int MAX_DECIMAL_PRECISION = 38;
+    private static final int MIN_DECIMAL_PRECISION = 1;
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new OracleRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "FETCH FIRST " + limit + " ROWS ONLY";
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("oracle.jdbc.OracleDriver");
+    }
+
+    @Override
+    public String dialectName() {
+        return "Oracle";
+    }
+
+    @Override
+    public boolean canHandle(String url) {
+        return url.startsWith("jdbc:oracle:");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+
+        String sourceFields =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String onClause =
+                Arrays.stream(uniqueKeyFields)
+                        .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
+                        .collect(Collectors.joining(" and "));
+
+        final Set<String> uniqueKeyFieldsSet =
+                Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !uniqueKeyFieldsSet.contains(f))
+                        .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertFields =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> "s." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        // if we can't divide schema and table-name is risky to call quoteIdentifier(tableName)
+        // for example [tbo].[sometable] is ok but [tbo.sometable] is not
+        String mergeQuery =
+                " MERGE INTO "
+                        + tableName
+                        + " t "
+                        + " USING (SELECT "
+                        + sourceFields
+                        + " FROM DUAL) s "
+                        + " ON ("
+                        + onClause
+                        + ") "
+                        + " WHEN MATCHED THEN UPDATE SET "
+                        + updateClause
+                        + " WHEN NOT MATCHED THEN INSERT ("
+                        + insertFields
+                        + ")"
+                        + " VALUES ("
+                        + valuesClause
+                        + ")";
+
+        return Optional.of(mergeQuery);
+    }
+
+    @Override
+    public int maxDecimalPrecision() {
+        return MAX_DECIMAL_PRECISION;
+    }
+
+    @Override
+    public int minDecimalPrecision() {
+        return MIN_DECIMAL_PRECISION;
+    }
+
+    @Override
+    public int maxTimestampPrecision() {
+        return MAX_TIMESTAMP_PRECISION;
+    }
+
+    @Override
+    public int minTimestampPrecision() {
+        return MIN_TIMESTAMP_PRECISION;
+    }
+
+    @Override
+    public List<LogicalTypeRoot> unsupportedTypes() {
+        // The data types used in Oracle are list at:
+        // https://www.techonthenet.com/oracle/datatypes.php
+        return Arrays.asList(
+                LogicalTypeRoot.BINARY,
+                LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+                LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+                LogicalTypeRoot.INTERVAL_DAY_TIME,
+                LogicalTypeRoot.MULTISET,
+                LogicalTypeRoot.MAP,
+                LogicalTypeRoot.ROW,
+                LogicalTypeRoot.DISTINCT_TYPE,
+                LogicalTypeRoot.STRUCTURED_TYPE,
+                LogicalTypeRoot.NULL,
+                LogicalTypeRoot.RAW,
+                LogicalTypeRoot.SYMBOL,
+                LogicalTypeRoot.UNRESOLVED);
+    }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
index 678a980ac..5ca5a5172 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.jdbc.sqlserver.SqlServerRowConvert;
+import org.apache.inlong.sort.jdbc.converter.sqlserver.SqlServerRowConvert;
 import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
 
 import java.util.Arrays;
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index a9d473661..e62cf5b2c 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.jdbc.table;
 
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.inlong.sort.jdbc.dialect.OracleDialect;
 import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
 import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;
 
@@ -38,6 +39,7 @@ public final class JdbcDialects {
         DIALECTS.add(new MySQLDialect());
         DIALECTS.add(new TDSQLPostgresDialect());
         DIALECTS.add(new SqlServerDialect());
+        DIALECTS.add(new OracleDialect());
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
new file mode 100644
index 000000000..b7553c183
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -0,0 +1,124 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link OracleLoadNode}
+ */
+public class OracleLoadSqlParseTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("student"), "localhost", "inlong",
+                "inlong", "inlong", null, null,
+                null, null);
+    }
+
+    private Node buildOracleLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("ID", new LongFormatInfo()),
+                new FieldInfo("NAME", new StringFormatInfo()),
+                new FieldInfo("AGE", new IntFormatInfo())
+        );
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("ID", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("NAME", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("AGE", new IntFormatInfo()))
+                );
+        return new OracleLoadNode("2", "oracle_output", fields, relations, null,
+                null, null, null, "jdbc:oracle:thin:@localhost:1521:xe",
+                "flinkuser", "flinkpw", "student", "ID");
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link OracleLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testMySqlLoadSqlParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode();
+        Node outputNode = buildOracleLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 18f5ee3ef..7331db48e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,8 @@
         <rocksdb.version>6.14.6</rocksdb.version>
         <hadoop.version>2.10.1</hadoop.version>
         <postgresql.version>42.3.4</postgresql.version>
+        <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
+        <mysql.jdbc.version>8.0.21</mysql.jdbc.version>
         <mybatis.starter.version>2.1.3</mybatis.starter.version>
         <mybatis.version>3.5.9</mybatis.version>
         <druid.version>1.2.6</druid.version>
@@ -519,6 +521,20 @@
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.version}</version>
             </dependency>
+            <!--ojdbc8 is FUTC license, we use it test only-->
+            <dependency>
+                <groupId>com.oracle.database.jdbc</groupId>
+                <artifactId>ojdbc8</artifactId>
+                <version>${oracle.jdbc.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <!-- mysql-connector-java is LGPL license, we use it test only -->
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>${mysql.jdbc.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>druid-spring-boot-starter</artifactId>
@@ -1018,6 +1034,13 @@
                 <groupId>com.ververica</groupId>
                 <artifactId>flink-connector-oracle-cdc</artifactId>
                 <version>${flink.connector.oracle.cdc.version}</version>
+                <!-- ojdbc8 is FUTC license and needs to be excluded -->
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.oracle.ojdbc</groupId>
+                        <artifactId>ojdbc8</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
 
             <dependency>