You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/27 09:09:43 UTC
[incubator-seatunnel] branch api-draft updated: [bugfix] Change the JDBC data type (#2065)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 682c58db5 [bugfix] Change the JDBC data type (#2065)
682c58db5 is described below
commit 682c58db5420573b0260ef2886df32afc6453554
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Jun 27 17:09:37 2022 +0800
[bugfix] Change the JDBC data type (#2065)
---
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 7 ++++---
.../internal/converter/AbstractJdbcRowConverter.java | 18 ++++++++++--------
.../jdbc/internal/dialect/mysql/MySqlTypeMapper.java | 13 ++++++++-----
.../connectors/seatunnel/jdbc/source/JdbcSource.java | 17 +++++++++++------
4 files changed, 33 insertions(+), 22 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 69638d042..4f6e61ce5 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -26,8 +26,9 @@ import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import com.mysql.cj.MysqlType;
@@ -203,13 +204,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
case MEDIUMBLOB:
case LONGBLOB:
case GEOMETRY:
- return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+ return PrimitiveByteArrayType.INSTANCE;
case BIGINT_UNSIGNED:
case DECIMAL:
case DECIMAL_UNSIGNED:
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
- return BasicType.BIG_DECIMAL_TYPE;
+ return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
throw new UnsupportedOperationException(String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 57d3501b0..92a1eae2c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -18,12 +18,15 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -58,27 +61,26 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
seatunnelField = rs.getShort(i);
} else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getInt(i);
- } else if (BasicType.BIG_INT_TYPE.equals(seaTunnelDataType)) {
- seatunnelField = rs.getObject(i);
} else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getLong(i);
- } else if (BasicType.BIG_DECIMAL_TYPE.equals(seaTunnelDataType)) {
- seatunnelField = rs.getBigDecimal(i);
+ } else if (seaTunnelDataType instanceof DecimalType) {
+ Object value = rs.getObject(i);
+ seatunnelField = value instanceof BigInteger ?
+ new BigDecimal((BigInteger) value, 0)
+ : value;
} else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getFloat(i);
} else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getDouble(i);
} else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getString(i);
- } else if (BasicType.DATE_TYPE.equals(seaTunnelDataType)) {
- seatunnelField = rs.getObject(i);
} else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getTime(i).toLocalTime();
} else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getDate(i).toLocalDate();
} else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getTimestamp(i).toLocalDateTime();
- } else if (PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE.equals(seaTunnelDataType)) {
+ } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBytes(i);
} else {
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index 3b853a2c3..db432f62d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -84,6 +85,7 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
private static final String MYSQL_VARBINARY = "VARBINARY";
private static final String MYSQL_GEOMETRY = "GEOMETRY";
+ @SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
@@ -107,10 +109,11 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
case MYSQL_BIGINT:
return BasicType.LONG_TYPE;
case MYSQL_BIGINT_UNSIGNED:
- return BasicType.BIG_INT_TYPE;
+ return new DecimalType(20, 0);
case MYSQL_DECIMAL:
+ return new DecimalType(precision, scale);
case MYSQL_DECIMAL_UNSIGNED:
- return BasicType.BIG_DECIMAL_TYPE;
+ return new DecimalType(precision + 1, scale);
case MYSQL_FLOAT:
return BasicType.FLOAT_TYPE;
case MYSQL_FLOAT_UNSIGNED:
@@ -145,15 +148,15 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
case MYSQL_TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- //Doesn't support binary yet
case MYSQL_TINYBLOB:
case MYSQL_MEDIUMBLOB:
case MYSQL_BLOB:
case MYSQL_LONGBLOB:
case MYSQL_VARBINARY:
case MYSQL_BINARY:
- return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+ return PrimitiveByteArrayType.INSTANCE;
+ //Doesn't support yet
case MYSQL_GEOMETRY:
case MYSQL_UNKNOWN:
default:
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 41d07e92e..73a61b3a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -97,13 +98,13 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
}
@Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
}
@Override
@@ -125,6 +126,11 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
return new JdbcSourceReader(inputFormat, readerContext);
}
+ @Override
+ public Serializer<JdbcSourceSplit> getSplitSerializer() {
+ return SeaTunnelSource.super.getSplitSerializer();
+ }
+
@Override
public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> createEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext) throws Exception {
return new JdbcSourceSplitEnumerator(enumeratorContext, jdbcSourceOptions, partitionParameter);
@@ -204,8 +210,7 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
}
private boolean isNumericType(SeaTunnelDataType<?> type) {
- return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.BIG_INT_TYPE)
- || type.equals(BasicType.LONG_TYPE);
+ return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE);
}
}