You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/30 05:00:58 UTC
[incubator-inlong] branch master updated: [INLONG-4394][Sort] Add Oracle data load support (#4413)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 323e695e4 [INLONG-4394][Sort] Add Oracle data load support (#4413)
323e695e4 is described below
commit 323e695e4de8e21001d404d3073a948a27f280f9
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon May 30 13:00:53 2022 +0800
[INLONG-4394][Sort] Add Oracle data load support (#4413)
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 6 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
.../sort/protocol/node/load/OracleLoadNode.java | 102 ++++++++
.../protocol/node/load/OracleLoadNodeTest.java | 46 ++++
inlong-sort/sort-connectors/jdbc/pom.xml | 12 +
.../jdbc/converter/AbstractJdbcRowConverter.java | 283 +++++++++++++++++++++
.../clickhouse/ClickHouseRowConverter.java | 7 +-
.../jdbc/converter/oracle/OracleRowConverter.java | 181 +++++++++++++
.../sqlserver/SqlServerRowConvert.java | 6 +-
.../sort/jdbc/dialect/ClickHouseDialect.java | 2 +-
.../inlong/sort/jdbc/dialect/OracleDialect.java | 175 +++++++++++++
.../inlong/sort/jdbc/dialect/SqlServerDialect.java | 2 +-
.../inlong/sort/jdbc/table/JdbcDialects.java | 2 +
.../inlong/sort/parser/OracleLoadSqlParseTest.java | 124 +++++++++
pom.xml | 23 ++
15 files changed, 963 insertions(+), 12 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 98d3324b8..6fbfc24a8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -60,7 +61,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
- @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+ @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+ @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
})
@NoArgsConstructor
@Data
@@ -98,7 +100,7 @@ public abstract class LoadNode implements Node {
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
- @JsonProperty("properties") Map<String, String> properties) {
+ @Nullable @JsonProperty("properties") Map<String, String> properties) {
this.id = Preconditions.checkNotNull(id, "id is null");
this.name = name;
this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 470901782..0c62e0d37 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -73,7 +74,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
- @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
+ @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+ @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
new file mode 100644
index 000000000..c725c07e1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Oracle load node can load data into Oracle
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("oracleLoad")
+@Data
+@NoArgsConstructor
+public class OracleLoadNode extends LoadNode implements Serializable {
+ /**
+ * jdbc:oracle://host:port/database
+ */
+ @JsonProperty("url")
+ private String url;
+ @JsonProperty("username")
+ private String username;
+ @JsonProperty("password")
+ private String password;
+ @JsonProperty("tableName")
+ private String tableName;
+ @JsonProperty("primaryKey")
+ private String primaryKey;
+
+ @JsonCreator
+ public OracleLoadNode(
+ @JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("url") String url,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("primaryKey") String primaryKey) {
+ super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ this.url = Preconditions.checkNotNull(url, "url is null");
+ this.username = Preconditions.checkNotNull(username, "username is null");
+ this.password = Preconditions.checkNotNull(password, "password is null");
+ this.tableName = Preconditions.checkNotNull(tableName, "tableName is null");
+ this.primaryKey = primaryKey;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+ options.put("connector", "jdbc-inlong");
+ options.put("url", url);
+ options.put("username", username);
+ options.put("password", password);
+ options.put("table-name", tableName);
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return String.format("table_%s", super.getId());
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
new file mode 100644
index 000000000..ca972fe23
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+
+import java.util.Collections;
+
+/**
+ * Test for {@link OracleLoadNode}
+ */
+public class OracleLoadNodeTest extends SerializeBaseTest<OracleLoadNode> {
+
+ @Override
+ public OracleLoadNode getTestObject() {
+ return new OracleLoadNode("1", "mysql_output",
+ Collections.singletonList(new FieldInfo("NAME", new StringFormatInfo())),
+ Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+ new StringFormatInfo()), new FieldInfo("NAME", new StringFormatInfo()))),
+ null, null, 1, null,
+ "jdbc:oracle:thin:@localhost:1521:xe",
+ "inlong",
+ "inlong",
+ "user",
+ "name");
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-connectors/jdbc/pom.xml
index a4fad240b..86b3610c7 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -43,6 +43,16 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
+ <!--for oracle-->
+ <dependency>
+ <groupId>com.oracle.database.jdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ </dependency>
+ <!--for mysql-->
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
<!--for jdbc-->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -74,6 +84,8 @@
<include>org.apache.flink:flink-connector-jdbc_${flink.scala.binary.version}</include>
<include>org.postgresql:postgresql</include>
<include>ru.yandex.clickhouse:clickhouse-jdbc</include>
+ <include>mysql:mysql-connector-java</include>
+ <include>com.oracle.database.jdbc:*</include>
</includes>
</artifactSet>
</configuration>
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
new file mode 100644
index 000000000..9a800b163
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all converters that convert between JDBC object and Flink internal object.
+ * Copy of ${@link org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter}
+ * to override the method createInternalConverter and createExternalConverter in subclasses
+ * by make JdbcDeserializationConverter,JdbcSerializationConverter public.
+ */
+public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+
+ protected final RowType rowType;
+ protected final JdbcDeserializationConverter[] toInternalConverters;
+ protected final JdbcSerializationConverter[] toExternalConverters;
+ protected final LogicalType[] fieldTypes;
+
+ public AbstractJdbcRowConverter(RowType rowType) {
+ this.rowType = checkNotNull(rowType);
+ this.fieldTypes =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ this.toInternalConverters = new JdbcDeserializationConverter[rowType.getFieldCount()];
+ this.toExternalConverters = new JdbcSerializationConverter[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ toInternalConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
+ toExternalConverters[i] = createNullableExternalConverter(fieldTypes[i]);
+ }
+ }
+
+ public abstract String converterName();
+
+ @Override
+ public RowData toInternal(ResultSet resultSet) throws SQLException {
+ GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
+ for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
+ Object field = resultSet.getObject(pos + 1);
+ genericRowData.setField(pos, toInternalConverters[pos].deserialize(field));
+ }
+ return genericRowData;
+ }
+
+ @Override
+ public FieldNamedPreparedStatement toExternal(
+ RowData rowData, FieldNamedPreparedStatement statement) throws SQLException {
+ for (int index = 0; index < rowData.getArity(); index++) {
+ toExternalConverters[index].serialize(rowData, index, statement);
+ }
+ return statement;
+ }
+
+ /**
+ * Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link
+ * LogicalType}.
+ */
+ protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
+ return wrapIntoNullableInternalConverter(createInternalConverter(type));
+ }
+
+ protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
+ JdbcDeserializationConverter jdbcDeserializationConverter) {
+ return val -> {
+ if (val == null) {
+ return null;
+ } else {
+ return jdbcDeserializationConverter.deserialize(val);
+ }
+ };
+ }
+
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return val -> null;
+ case BOOLEAN:
+ case FLOAT:
+ case DOUBLE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return val -> val;
+ case TINYINT:
+ return val -> ((Integer) val).byteValue();
+ case SMALLINT:
+ // Converter for small type that casts value to int and then return short value,
+ // since
+ // JDBC 1.0 use int type for small values.
+ return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
+ case INTEGER:
+ return val -> val;
+ case BIGINT:
+ return val -> val;
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ // using decimal(20, 0) to support db type bigint unsigned, user should define
+ // decimal(20, 0) in SQL,
+ // but other precision like decimal(30, 0) can work too from lenient consideration.
+ return val ->
+ val instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) val, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+ case DATE:
+ return val -> (int) (((Date) val).toLocalDate().toEpochDay());
+ case TIME_WITHOUT_TIME_ZONE:
+ return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return val ->
+ val instanceof LocalDateTime
+ ? TimestampData.fromLocalDateTime((LocalDateTime) val)
+ : TimestampData.fromTimestamp((Timestamp) val);
+ case CHAR:
+ case VARCHAR:
+ return val -> StringData.fromString((String) val);
+ case BINARY:
+ case VARBINARY:
+ return val -> (byte[]) val;
+ case ARRAY:
+ case ROW:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ /**
+ * Create a nullable JDBC f{@link JdbcSerializationConverter} from given sql type.
+ */
+ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+ return wrapIntoNullableExternalConverter(createExternalConverter(type), type);
+ }
+
+ protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
+ JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
+ final int sqlType =
+ JdbcTypeUtil.typeInformationToSqlType(
+ TypeConversions.fromDataTypeToLegacyInfo(
+ TypeConversions.fromLogicalToDataType(type)));
+ return (val, index, statement) -> {
+ if (val == null
+ || val.isNullAt(index)
+ || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+ statement.setNull(index, sqlType);
+ } else {
+ jdbcSerializationConverter.serialize(val, index, statement);
+ }
+ };
+ }
+
+ protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return (val, index, statement) ->
+ statement.setBoolean(index, val.getBoolean(index));
+ case TINYINT:
+ return (val, index, statement) -> statement.setByte(index, val.getByte(index));
+ case SMALLINT:
+ return (val, index, statement) -> statement.setShort(index, val.getShort(index));
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return (val, index, statement) -> statement.setInt(index, val.getInt(index));
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return (val, index, statement) -> statement.setLong(index, val.getLong(index));
+ case FLOAT:
+ return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
+ case DOUBLE:
+ return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
+ case CHAR:
+ case VARCHAR:
+ // value is BinaryString
+ return (val, index, statement) ->
+ statement.setString(index, val.getString(index).toString());
+ case BINARY:
+ case VARBINARY:
+ return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
+ case DATE:
+ return (val, index, statement) ->
+ statement.setDate(
+ index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (val, index, statement) ->
+ statement.setTime(
+ index,
+ Time.valueOf(
+ LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType) type).getPrecision();
+ return (val, index, statement) ->
+ statement.setTimestamp(
+ index, val.getTimestamp(index, timestampPrecision).toTimestamp());
+ case DECIMAL:
+ final int decimalPrecision = ((DecimalType) type).getPrecision();
+ final int decimalScale = ((DecimalType) type).getScale();
+ return (val, index, statement) ->
+ statement.setBigDecimal(
+ index,
+ val.getDecimal(index, decimalPrecision, decimalScale)
+ .toBigDecimal());
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case ROW:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ /**
+ * Runtime converter to convert JDBC field to {@link RowData} type object.
+ */
+ @FunctionalInterface
+ public interface JdbcDeserializationConverter extends Serializable {
+ /**
+ * Convert a jdbc field object of {@link ResultSet} to the internal data structure object.
+ *
+ * @param jdbcField
+ */
+ Object deserialize(Object jdbcField) throws SQLException;
+ }
+
+ /**
+ * Runtime converter to convert {@link RowData} field to java object and fill into the {@link
+ * PreparedStatement}.
+ */
+ @FunctionalInterface
+ public interface JdbcSerializationConverter extends Serializable {
+ void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement)
+ throws SQLException;
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
similarity index 87%
rename from inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java
rename to inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
index 198f22e77..4046c7b6a 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/clickhouse/ClickHouseRowConverter.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.jdbc.clickhouse;
+package org.apache.inlong.sort.jdbc.converter.clickhouse;
-import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
/**
- * Runtime converter that responsible to convert between JDBC object and Flink internal object for
- * Derby.
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for ClickHouse.
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java
new file mode 100644
index 000000000..595b57456
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/oracle/OracleRowConverter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter.oracle;
+
+import oracle.jdbc.internal.OracleBlob;
+import oracle.jdbc.internal.OracleClob;
+import oracle.sql.BINARY_DOUBLE;
+import oracle.sql.BINARY_FLOAT;
+import oracle.sql.CHAR;
+import oracle.sql.DATE;
+import oracle.sql.NUMBER;
+import oracle.sql.RAW;
+import oracle.sql.TIMESTAMP;
+import oracle.sql.TIMESTAMPTZ;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for Oracle.
+ */
+public class OracleRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public OracleRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return val -> null;
+ case BOOLEAN:
+ return val -> val instanceof NUMBER ? ((NUMBER) val).booleanValue() : val;
+ case FLOAT:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).floatValue()
+ : val instanceof BINARY_FLOAT
+ ? ((BINARY_FLOAT) val).floatValue()
+ : val instanceof BigDecimal
+ ? ((BigDecimal) val).floatValue()
+ : val;
+ case DOUBLE:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).doubleValue()
+ : val instanceof BINARY_DOUBLE
+ ? ((BINARY_DOUBLE) val).doubleValue()
+ : val instanceof BigDecimal
+ ? ((BigDecimal) val).doubleValue()
+ : val;
+ case TINYINT:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).byteValue()
+ : val instanceof BigDecimal ? ((BigDecimal) val).byteValue() : val;
+ case SMALLINT:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).shortValue()
+ : val instanceof BigDecimal ? ((BigDecimal) val).shortValue() : val;
+ case INTEGER:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).intValue()
+ : val instanceof BigDecimal ? ((BigDecimal) val).intValue() : val;
+ case BIGINT:
+ return val ->
+ val instanceof NUMBER
+ ? ((NUMBER) val).longValue()
+ : val instanceof BigDecimal ? ((BigDecimal) val).longValue() : val;
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ return val ->
+ val instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) val, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+ case CHAR:
+ case VARCHAR:
+ return val ->
+ (val instanceof CHAR)
+ ? StringData.fromString(((CHAR) val).getString())
+ : (val instanceof OracleClob)
+ ? StringData.fromString(((OracleClob) val).stringValue())
+ : StringData.fromString((String) val);
+ case BINARY:
+ case VARBINARY:
+ case RAW:
+ return val ->
+ val instanceof RAW
+ ? ((RAW) val).getBytes()
+ : val instanceof OracleBlob
+ ? ((OracleBlob) val)
+ .getBytes(1, (int) ((OracleBlob) val).length())
+ : val.toString().getBytes();
+
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val;
+ case DATE:
+ return val ->
+ val instanceof DATE
+ ? (int) (((DATE) val).dateValue().toLocalDate().toEpochDay())
+ : val instanceof Timestamp
+ ? (int)
+ (((Timestamp) val)
+ .toLocalDateTime()
+ .toLocalDate()
+ .toEpochDay())
+ : (int) (((Date) val).toLocalDate().toEpochDay());
+ case TIME_WITHOUT_TIME_ZONE:
+ return val ->
+ val instanceof DATE
+ ? (int)
+ (((DATE) val).timeValue().toLocalTime().toNanoOfDay()
+ / 1_000_000L)
+ : (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return val ->
+ val instanceof TIMESTAMP
+ ? TimestampData.fromTimestamp(((TIMESTAMP) val).timestampValue())
+ : TimestampData.fromTimestamp((Timestamp) val);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ return val -> {
+ if (val instanceof TIMESTAMPTZ) {
+ final TIMESTAMPTZ ts = (TIMESTAMPTZ) val;
+ final ZonedDateTime zdt =
+ ZonedDateTime.ofInstant(
+ ts.timestampValue().toInstant(),
+ ts.getTimeZone().toZoneId());
+ return TimestampData.fromLocalDateTime(zdt.toLocalDateTime());
+ } else {
+ return TimestampData.fromTimestamp((Timestamp) val);
+ }
+ };
+ case ARRAY:
+ case ROW:
+ case MAP:
+ case MULTISET:
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+
+ @Override
+ public String converterName() {
+ return "Oracle";
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
similarity index 89%
rename from inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java
rename to inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
index a355d6b4e..3900fe6c9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/sqlserver/SqlServerRowConvert.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/sqlserver/SqlServerRowConvert.java
@@ -16,17 +16,17 @@
*
*/
-package org.apache.inlong.sort.jdbc.sqlserver;
+package org.apache.inlong.sort.jdbc.converter.sqlserver;
-import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* SqlServer.
*/
public class SqlServerRowConvert extends AbstractJdbcRowConverter {
-
+
public SqlServerRowConvert(RowType rowType) {
super(rowType);
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
index 781f0b078..9216f6485 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.jdbc.clickhouse.ClickHouseRowConverter;
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
import java.util.Arrays;
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java
new file mode 100644
index 000000000..b158bf1c4
--- /dev/null
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/OracleDialect.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.oracle.OracleRowConverter;
+import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * JDBC dialect for Oracle.
+ */
+public class OracleDialect extends AbstractJdbcDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
+ // https://www.techonthenet.com/oracle/datatypes.php
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+ private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+ // Define MAX/MIN precision of DECIMAL type according to Oracle docs:
+ // https://www.techonthenet.com/oracle/datatypes.php
+ private static final int MAX_DECIMAL_PRECISION = 38;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new OracleRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "FETCH FIRST " + limit + " ROWS ONLY";
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("oracle.jdbc.OracleDriver");
+ }
+
+ @Override
+ public String dialectName() {
+ return "Oracle";
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:oracle:");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+
+ String sourceFields =
+ Arrays.stream(fieldNames)
+ .map(f -> ":" + f + " " + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ String onClause =
+ Arrays.stream(uniqueKeyFields)
+ .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
+ .collect(Collectors.joining(" and "));
+
+ final Set<String> uniqueKeyFieldsSet =
+ Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());
+ String updateClause =
+ Arrays.stream(fieldNames)
+ .filter(f -> !uniqueKeyFieldsSet.contains(f))
+ .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ String insertFields =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+
+ String valuesClause =
+ Arrays.stream(fieldNames)
+ .map(f -> "s." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ // if we can't divide schema and table-name is risky to call quoteIdentifier(tableName)
+ // for example [tbo].[sometable] is ok but [tbo.sometable] is not
+ String mergeQuery =
+ " MERGE INTO "
+ + tableName
+ + " t "
+ + " USING (SELECT "
+ + sourceFields
+ + " FROM DUAL) s "
+ + " ON ("
+ + onClause
+ + ") "
+ + " WHEN MATCHED THEN UPDATE SET "
+ + updateClause
+ + " WHEN NOT MATCHED THEN INSERT ("
+ + insertFields
+ + ")"
+ + " VALUES ("
+ + valuesClause
+ + ")";
+
+ return Optional.of(mergeQuery);
+ }
+
+ @Override
+ public int maxDecimalPrecision() {
+ return MAX_DECIMAL_PRECISION;
+ }
+
+ @Override
+ public int minDecimalPrecision() {
+ return MIN_DECIMAL_PRECISION;
+ }
+
+ @Override
+ public int maxTimestampPrecision() {
+ return MAX_TIMESTAMP_PRECISION;
+ }
+
+ @Override
+ public int minTimestampPrecision() {
+ return MIN_TIMESTAMP_PRECISION;
+ }
+
+ @Override
+ public List<LogicalTypeRoot> unsupportedTypes() {
+ // The data types used in Oracle are list at:
+ // https://www.techonthenet.com/oracle/datatypes.php
+ return Arrays.asList(
+ LogicalTypeRoot.BINARY,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+ LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+ LogicalTypeRoot.INTERVAL_DAY_TIME,
+ LogicalTypeRoot.MULTISET,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ROW,
+ LogicalTypeRoot.DISTINCT_TYPE,
+ LogicalTypeRoot.STRUCTURED_TYPE,
+ LogicalTypeRoot.NULL,
+ LogicalTypeRoot.RAW,
+ LogicalTypeRoot.SYMBOL,
+ LogicalTypeRoot.UNRESOLVED);
+ }
+}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
index 678a980ac..5ca5a5172 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/SqlServerDialect.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.jdbc.sqlserver.SqlServerRowConvert;
+import org.apache.inlong.sort.jdbc.converter.sqlserver.SqlServerRowConvert;
import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
import java.util.Arrays;
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index a9d473661..e62cf5b2c 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.jdbc.table;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.inlong.sort.jdbc.dialect.OracleDialect;
import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;
@@ -38,6 +39,7 @@ public final class JdbcDialects {
DIALECTS.add(new MySQLDialect());
DIALECTS.add(new TDSQLPostgresDialect());
DIALECTS.add(new SqlServerDialect());
+ DIALECTS.add(new OracleDialect());
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
new file mode 100644
index 000000000..b7553c183
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link OracleLoadNode}
+ */
+public class OracleLoadSqlParseTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("student"), "localhost", "inlong",
+ "inlong", "inlong", null, null,
+ null, null);
+ }
+
+ private Node buildOracleLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("ID", new LongFormatInfo()),
+ new FieldInfo("NAME", new StringFormatInfo()),
+ new FieldInfo("AGE", new IntFormatInfo())
+ );
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("ID", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("NAME", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("AGE", new IntFormatInfo()))
+ );
+ return new OracleLoadNode("2", "oracle_output", fields, relations, null,
+ null, null, null, "jdbc:oracle:thin:@localhost:1521:xe",
+ "flinkuser", "flinkpw", "student", "ID");
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link OracleLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testMySqlLoadSqlParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildOracleLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 18f5ee3ef..7331db48e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,8 @@
<rocksdb.version>6.14.6</rocksdb.version>
<hadoop.version>2.10.1</hadoop.version>
<postgresql.version>42.3.4</postgresql.version>
+ <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
+ <mysql.jdbc.version>8.0.21</mysql.jdbc.version>
<mybatis.starter.version>2.1.3</mybatis.starter.version>
<mybatis.version>3.5.9</mybatis.version>
<druid.version>1.2.6</druid.version>
@@ -519,6 +521,20 @@
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
+ <!--ojdbc8 is FUTC license, we use it test only-->
+ <dependency>
+ <groupId>com.oracle.database.jdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ <version>${oracle.jdbc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- mysql-connector-java is LGPL license, we use it test only -->
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.jdbc.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
@@ -1018,6 +1034,13 @@
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flink.connector.oracle.cdc.version}</version>
+ <!-- ojdbc8 is FUTC license and needs to be excluded -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.oracle.ojdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>