You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/27 09:09:43 UTC

[incubator-seatunnel] branch api-draft updated: [bugfix] Change the JDBC data type (#2065)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 682c58db5 [bugfix] Change the JDBC data type (#2065)
682c58db5 is described below

commit 682c58db5420573b0260ef2886df32afc6453554
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Jun 27 17:09:37 2022 +0800

    [bugfix] Change the JDBC data type (#2065)
---
 .../seatunnel/jdbc/catalog/MySqlCatalog.java           |  7 ++++---
 .../internal/converter/AbstractJdbcRowConverter.java   | 18 ++++++++++--------
 .../jdbc/internal/dialect/mysql/MySqlTypeMapper.java   | 13 ++++++++-----
 .../connectors/seatunnel/jdbc/source/JdbcSource.java   | 17 +++++++++++------
 4 files changed, 33 insertions(+), 22 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 69638d042..4f6e61ce5 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -26,8 +26,9 @@ import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import com.mysql.cj.MysqlType;
@@ -203,13 +204,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             case MEDIUMBLOB:
             case LONGBLOB:
             case GEOMETRY:
-                return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+                return PrimitiveByteArrayType.INSTANCE;
             case BIGINT_UNSIGNED:
             case DECIMAL:
             case DECIMAL_UNSIGNED:
                 int precision = metadata.getPrecision(colIndex);
                 int scale = metadata.getScale(colIndex);
-                return BasicType.BIG_DECIMAL_TYPE;
+                return new DecimalType(precision, scale);
                 // TODO: support 'SET' & 'YEAR' type
             default:
                 throw new UnsupportedOperationException(String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 57d3501b0..92a1eae2c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -18,12 +18,15 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -58,27 +61,26 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
                 seatunnelField = rs.getShort(i);
             } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getInt(i);
-            } else if (BasicType.BIG_INT_TYPE.equals(seaTunnelDataType)) {
-                seatunnelField = rs.getObject(i);
             } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getLong(i);
-            } else if (BasicType.BIG_DECIMAL_TYPE.equals(seaTunnelDataType)) {
-                seatunnelField = rs.getBigDecimal(i);
+            } else if (seaTunnelDataType instanceof DecimalType) {
+                Object value = rs.getObject(i);
+                seatunnelField = value instanceof BigInteger ?
+                    new BigDecimal((BigInteger) value, 0)
+                    : value;
             } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getFloat(i);
             } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getDouble(i);
             } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getString(i);
-            } else if (BasicType.DATE_TYPE.equals(seaTunnelDataType)) {
-                seatunnelField = rs.getObject(i);
             } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getTime(i).toLocalTime();
             } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getDate(i).toLocalDate();
             } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getTimestamp(i).toLocalDateTime();
-            } else if (PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE.equals(seaTunnelDataType)) {
+            } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getBytes(i);
             } else {
                 throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index 3b853a2c3..db432f62d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -18,8 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
@@ -84,6 +85,7 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
     private static final String MYSQL_VARBINARY = "VARBINARY";
     private static final String MYSQL_GEOMETRY = "GEOMETRY";
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
         String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
@@ -107,10 +109,11 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
             case MYSQL_BIGINT:
                 return BasicType.LONG_TYPE;
             case MYSQL_BIGINT_UNSIGNED:
-                return BasicType.BIG_INT_TYPE;
+                return new DecimalType(20, 0);
             case MYSQL_DECIMAL:
+                return new DecimalType(precision, scale);
             case MYSQL_DECIMAL_UNSIGNED:
-                return BasicType.BIG_DECIMAL_TYPE;
+                return new DecimalType(precision + 1, scale);
             case MYSQL_FLOAT:
                 return BasicType.FLOAT_TYPE;
             case MYSQL_FLOAT_UNSIGNED:
@@ -145,15 +148,15 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper {
             case MYSQL_TIMESTAMP:
                 return LocalTimeType.LOCAL_DATE_TIME_TYPE;
 
-            //Doesn't support binary yet
             case MYSQL_TINYBLOB:
             case MYSQL_MEDIUMBLOB:
             case MYSQL_BLOB:
             case MYSQL_LONGBLOB:
             case MYSQL_VARBINARY:
             case MYSQL_BINARY:
-                return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+                return PrimitiveByteArrayType.INSTANCE;
 
+            //Doesn't support yet
             case MYSQL_GEOMETRY:
             case MYSQL_UNKNOWN:
             default:
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 41d07e92e..73a61b3a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -97,13 +98,13 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
     }
 
     @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
     }
 
     @Override
@@ -125,6 +126,11 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
         return new JdbcSourceReader(inputFormat, readerContext);
     }
 
+    @Override
+    public Serializer<JdbcSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
     @Override
     public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> createEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext) throws Exception {
         return new JdbcSourceSplitEnumerator(enumeratorContext, jdbcSourceOptions, partitionParameter);
@@ -204,8 +210,7 @@ public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
     }
 
     private boolean isNumericType(SeaTunnelDataType<?> type) {
-        return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.BIG_INT_TYPE)
-            || type.equals(BasicType.LONG_TYPE);
+        return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE);
     }
 
 }