You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/19 03:45:30 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 91f512597 [Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)
91f512597 is described below

commit 91f5125976b63a3d508bd8e19307a91d994ee465
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Apr 19 11:45:22 2023 +0800

    [Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)
---
 .../catalog/psql/PostgresDataTypeConvertor.java    | 223 +++++++++++++++++++++
 1 file changed, 223 insertions(+)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
new file mode 100644
index 000000000..81bf5ca06
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.apache.commons.collections4.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class PostgresDataTypeConvertor implements DataTypeConvertor<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectTypeMapper.class);
+
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+
+    public static final Integer DEFAULT_PRECISION = 38;
+
+    public static final Integer DEFAULT_SCALE = 18;
+
+    // Postgres jdbc driver maps several alias to real type, we use real type rather than alias:
+    // serial2 <=> int2
+    // smallserial <=> int2
+    // serial4 <=> serial
+    // serial8 <=> bigserial
+    // smallint <=> int2
+    // integer <=> int4
+    // int <=> int4
+    // bigint <=> int8
+    // float <=> float8
+    // boolean <=> bool
+    // decimal <=> numeric
+    private static final String PG_SMALLSERIAL = "smallserial";
+    private static final String PG_SERIAL = "serial";
+    private static final String PG_BIGSERIAL = "bigserial";
+    private static final String PG_BYTEA = "bytea";
+    private static final String PG_BYTEA_ARRAY = "_bytea";
+    private static final String PG_SMALLINT = "int2";
+    private static final String PG_SMALLINT_ARRAY = "_int2";
+    private static final String PG_INTEGER = "int4";
+    private static final String PG_INTEGER_ARRAY = "_int4";
+    private static final String PG_BIGINT = "int8";
+    private static final String PG_BIGINT_ARRAY = "_int8";
+    private static final String PG_REAL = "float4";
+    private static final String PG_REAL_ARRAY = "_float4";
+    private static final String PG_DOUBLE_PRECISION = "float8";
+    private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+    private static final String PG_NUMERIC = "numeric";
+    private static final String PG_NUMERIC_ARRAY = "_numeric";
+    private static final String PG_BOOLEAN = "bool";
+    private static final String PG_BOOLEAN_ARRAY = "_bool";
+    private static final String PG_TIMESTAMP = "timestamp";
+    private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+    private static final String PG_TIMESTAMPTZ = "timestamptz";
+    private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+    private static final String PG_DATE = "date";
+    private static final String PG_DATE_ARRAY = "_date";
+    private static final String PG_TIME = "time";
+    private static final String PG_TIME_ARRAY = "_time";
+    private static final String PG_TEXT = "text";
+    private static final String PG_TEXT_ARRAY = "_text";
+    private static final String PG_CHAR = "bpchar";
+    private static final String PG_CHAR_ARRAY = "_bpchar";
+    private static final String PG_CHARACTER = "character";
+    private static final String PG_CHARACTER_ARRAY = "_character";
+    private static final String PG_CHARACTER_VARYING = "varchar";
+    private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        return toSeaTunnelType(connectorDataType, new HashMap<>(0));
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            String connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(connectorDataType, "Postgres Type cannot be null");
+        switch (connectorDataType) {
+            case PG_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case PG_BOOLEAN_ARRAY:
+                return ArrayType.BOOLEAN_ARRAY_TYPE;
+            case PG_BYTEA:
+                return PrimitiveByteArrayType.INSTANCE;
+            case PG_BYTEA_ARRAY:
+                return ArrayType.BYTE_ARRAY_TYPE;
+            case PG_SMALLINT:
+            case PG_SMALLSERIAL:
+            case PG_INTEGER:
+            case PG_SERIAL:
+                return BasicType.INT_TYPE;
+            case PG_SMALLINT_ARRAY:
+            case PG_INTEGER_ARRAY:
+                return ArrayType.INT_ARRAY_TYPE;
+            case PG_BIGINT:
+            case PG_BIGSERIAL:
+                return BasicType.LONG_TYPE;
+            case PG_BIGINT_ARRAY:
+                return ArrayType.LONG_ARRAY_TYPE;
+            case PG_REAL:
+                return BasicType.FLOAT_TYPE;
+            case PG_REAL_ARRAY:
+                return ArrayType.FLOAT_ARRAY_TYPE;
+            case PG_DOUBLE_PRECISION:
+                return BasicType.DOUBLE_TYPE;
+            case PG_DOUBLE_PRECISION_ARRAY:
+                return ArrayType.DOUBLE_ARRAY_TYPE;
+            case PG_NUMERIC:
+                int precision =
+                        MapUtils.getInteger(dataTypeProperties, PRECISION, DEFAULT_PRECISION);
+                ;
+                int scale = MapUtils.getInteger(dataTypeProperties, SCALE, DEFAULT_SCALE);
+                return new DecimalType(precision, scale);
+            case PG_CHAR:
+            case PG_CHARACTER:
+            case PG_CHARACTER_VARYING:
+            case PG_TEXT:
+                return BasicType.STRING_TYPE;
+            case PG_CHAR_ARRAY:
+            case PG_CHARACTER_ARRAY:
+            case PG_CHARACTER_VARYING_ARRAY:
+            case PG_TEXT_ARRAY:
+                return ArrayType.STRING_ARRAY_TYPE;
+            case PG_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case PG_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case PG_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+
+            case PG_TIMESTAMP_ARRAY:
+            case PG_NUMERIC_ARRAY:
+            case PG_TIMESTAMPTZ:
+            case PG_TIMESTAMPTZ_ARRAY:
+            case PG_TIME_ARRAY:
+            case PG_DATE_ARRAY:
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support POSTGRES type '%s''  yet.", connectorDataType));
+        }
+    }
+
+    @Override
+    public String toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case TINYINT:
+            case SMALLINT:
+                return PG_SMALLINT;
+            case INT:
+                return PG_INTEGER;
+            case BIGINT:
+                return PG_BIGINT;
+            case DECIMAL:
+                return PG_NUMERIC;
+            case FLOAT:
+                return PG_REAL;
+            case DOUBLE:
+                return PG_DOUBLE_PRECISION;
+            case BOOLEAN:
+                return PG_BOOLEAN;
+            case STRING:
+                return PG_TEXT;
+            case DATE:
+                return PG_DATE;
+            case BYTES:
+                return PG_BYTEA;
+            case TIME:
+                return PG_TIME;
+            case TIMESTAMP:
+                return PG_TIMESTAMP;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support SeaTunnel type '%s''  yet.", seaTunnelDataType));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return "POSTGRES";
+    }
+}