You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/19 03:45:30 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 91f512597 [Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)
91f512597 is described below
commit 91f5125976b63a3d508bd8e19307a91d994ee465
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Apr 19 11:45:22 2023 +0800
[Feature][Connector][Jdbc] Add DataTypeConvertor for JDBC-Postgres (#4575)
---
.../catalog/psql/PostgresDataTypeConvertor.java | 223 +++++++++++++++++++++
1 file changed, 223 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
new file mode 100644
index 000000000..81bf5ca06
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.apache.commons.collections4.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class PostgresDataTypeConvertor implements DataTypeConvertor<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectTypeMapper.class);
+
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ public static final Integer DEFAULT_PRECISION = 38;
+
+ public static final Integer DEFAULT_SCALE = 18;
+
+ // Postgres jdbc driver maps several alias to real type, we use real type rather than alias:
+ // serial2 <=> int2
+ // smallserial <=> int2
+ // serial4 <=> serial
+ // serial8 <=> bigserial
+ // smallint <=> int2
+ // integer <=> int4
+ // int <=> int4
+ // bigint <=> int8
+ // float <=> float8
+ // boolean <=> bool
+ // decimal <=> numeric
+ private static final String PG_SMALLSERIAL = "smallserial";
+ private static final String PG_SERIAL = "serial";
+ private static final String PG_BIGSERIAL = "bigserial";
+ private static final String PG_BYTEA = "bytea";
+ private static final String PG_BYTEA_ARRAY = "_bytea";
+ private static final String PG_SMALLINT = "int2";
+ private static final String PG_SMALLINT_ARRAY = "_int2";
+ private static final String PG_INTEGER = "int4";
+ private static final String PG_INTEGER_ARRAY = "_int4";
+ private static final String PG_BIGINT = "int8";
+ private static final String PG_BIGINT_ARRAY = "_int8";
+ private static final String PG_REAL = "float4";
+ private static final String PG_REAL_ARRAY = "_float4";
+ private static final String PG_DOUBLE_PRECISION = "float8";
+ private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+ private static final String PG_NUMERIC = "numeric";
+ private static final String PG_NUMERIC_ARRAY = "_numeric";
+ private static final String PG_BOOLEAN = "bool";
+ private static final String PG_BOOLEAN_ARRAY = "_bool";
+ private static final String PG_TIMESTAMP = "timestamp";
+ private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+ private static final String PG_TIMESTAMPTZ = "timestamptz";
+ private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+ private static final String PG_DATE = "date";
+ private static final String PG_DATE_ARRAY = "_date";
+ private static final String PG_TIME = "time";
+ private static final String PG_TIME_ARRAY = "_time";
+ private static final String PG_TEXT = "text";
+ private static final String PG_TEXT_ARRAY = "_text";
+ private static final String PG_CHAR = "bpchar";
+ private static final String PG_CHAR_ARRAY = "_bpchar";
+ private static final String PG_CHARACTER = "character";
+ private static final String PG_CHARACTER_ARRAY = "_character";
+ private static final String PG_CHARACTER_VARYING = "varchar";
+ private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ return toSeaTunnelType(connectorDataType, new HashMap<>(0));
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ String connectorDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(connectorDataType, "Postgres Type cannot be null");
+ switch (connectorDataType) {
+ case PG_BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case PG_BOOLEAN_ARRAY:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case PG_BYTEA:
+ return PrimitiveByteArrayType.INSTANCE;
+ case PG_BYTEA_ARRAY:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case PG_SMALLINT:
+ case PG_SMALLSERIAL:
+ case PG_INTEGER:
+ case PG_SERIAL:
+ return BasicType.INT_TYPE;
+ case PG_SMALLINT_ARRAY:
+ case PG_INTEGER_ARRAY:
+ return ArrayType.INT_ARRAY_TYPE;
+ case PG_BIGINT:
+ case PG_BIGSERIAL:
+ return BasicType.LONG_TYPE;
+ case PG_BIGINT_ARRAY:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case PG_REAL:
+ return BasicType.FLOAT_TYPE;
+ case PG_REAL_ARRAY:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case PG_DOUBLE_PRECISION:
+ return BasicType.DOUBLE_TYPE;
+ case PG_DOUBLE_PRECISION_ARRAY:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ case PG_NUMERIC:
+ int precision =
+ MapUtils.getInteger(dataTypeProperties, PRECISION, DEFAULT_PRECISION);
+ ;
+ int scale = MapUtils.getInteger(dataTypeProperties, SCALE, DEFAULT_SCALE);
+ return new DecimalType(precision, scale);
+ case PG_CHAR:
+ case PG_CHARACTER:
+ case PG_CHARACTER_VARYING:
+ case PG_TEXT:
+ return BasicType.STRING_TYPE;
+ case PG_CHAR_ARRAY:
+ case PG_CHARACTER_ARRAY:
+ case PG_CHARACTER_VARYING_ARRAY:
+ case PG_TEXT_ARRAY:
+ return ArrayType.STRING_ARRAY_TYPE;
+ case PG_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case PG_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case PG_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+
+ case PG_TIMESTAMP_ARRAY:
+ case PG_NUMERIC_ARRAY:
+ case PG_TIMESTAMPTZ:
+ case PG_TIMESTAMPTZ_ARRAY:
+ case PG_TIME_ARRAY:
+ case PG_DATE_ARRAY:
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support POSTGRES type '%s'' yet.", connectorDataType));
+ }
+ }
+
+ @Override
+ public String toConnectorType(
+ SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ switch (sqlType) {
+ case TINYINT:
+ case SMALLINT:
+ return PG_SMALLINT;
+ case INT:
+ return PG_INTEGER;
+ case BIGINT:
+ return PG_BIGINT;
+ case DECIMAL:
+ return PG_NUMERIC;
+ case FLOAT:
+ return PG_REAL;
+ case DOUBLE:
+ return PG_DOUBLE_PRECISION;
+ case BOOLEAN:
+ return PG_BOOLEAN;
+ case STRING:
+ return PG_TEXT;
+ case DATE:
+ return PG_DATE;
+ case BYTES:
+ return PG_BYTEA;
+ case TIME:
+ return PG_TIME;
+ case TIMESTAMP:
+ return PG_TIMESTAMP;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support SeaTunnel type '%s'' yet.", seaTunnelDataType));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return "POSTGRES";
+ }
+}