You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/06 14:15:04 UTC

[incubator-seatunnel] branch dev updated: [Bug] [flink-connector-jdbc]change jdbc Source connector to get fields from jdbc meta data and support oracle database (#1781)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1a368dc0  [Bug] [flink-connector-jdbc]change jdbc Source connector to get fields from jdbc meta data and support oracle database (#1781)
1a368dc0 is described below

commit 1a368dc04469b7c95366539b0b143304684735e6
Author: gleiyu <gl...@sina.cn>
AuthorDate: Fri May 6 22:14:59 2022 +0800

     [Bug] [flink-connector-jdbc]change jdbc Source connector to get fields from jdbc meta data and support oracle database (#1781)
    
    * get fields from jdbc meta data
    * remove regex pattern
    * use StringUtils
---
 .../flink/jdbc/input/OracleTypeInformationMap.java |  64 +++++++++
 .../seatunnel/flink/jdbc/source/JdbcSource.java    | 153 ++++++++-------------
 2 files changed, 122 insertions(+), 95 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/OracleTypeInformationMap.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/OracleTypeInformationMap.java
new file mode 100644
index 00000000..eb53dcb7
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/OracleTypeInformationMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.jdbc.input;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OracleTypeInformationMap implements TypeInformationMap {
+
+    private static final Map<String, TypeInformation<?>> INFORMATION_MAP = new HashMap<>();
+
+    static {
+        INFORMATION_MAP.put("NVARCHAR2", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("VARCHAR2", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("FLOAT", DOUBLE_TYPE_INFO);
+        INFORMATION_MAP.put("NUMBER", BIG_DEC_TYPE_INFO);
+        INFORMATION_MAP.put("LONG", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("DATE", SqlTimeTypeInfo.TIMESTAMP);
+        INFORMATION_MAP.put("RAW", BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+        INFORMATION_MAP.put("LONG RAW", BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+        INFORMATION_MAP.put("NCHAR", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("CHAR", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("BINARY_FLOAT", FLOAT_TYPE_INFO);
+        INFORMATION_MAP.put("BINARY_DOUBLE", DOUBLE_TYPE_INFO);
+        INFORMATION_MAP.put("ROWID", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("NCLOB", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("CLOB", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("BLOB", BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+        INFORMATION_MAP.put("BFILE", BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+        INFORMATION_MAP.put("TIMESTAMP", SqlTimeTypeInfo.TIMESTAMP);
+        INFORMATION_MAP.put("TIMESTAMP WITH TIME ZONE", SqlTimeTypeInfo.TIMESTAMP);
+        INFORMATION_MAP.put("TIMESTAMP WITH LOCAL TIME ZONE", SqlTimeTypeInfo.TIMESTAMP);
+    }
+
+    @Override
+    public TypeInformation<?> getInformation(String datatype) {
+        return INFORMATION_MAP.get(datatype);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index cb603884..0eee8c08 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,15 +40,16 @@ import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.flink.jdbc.input.DefaultTypeInformationMap;
 import org.apache.seatunnel.flink.jdbc.input.JdbcInputFormat;
 import org.apache.seatunnel.flink.jdbc.input.MysqlTypeInformationMap;
+import org.apache.seatunnel.flink.jdbc.input.OracleTypeInformationMap;
 import org.apache.seatunnel.flink.jdbc.input.PostgresTypeInformationMap;
 import org.apache.seatunnel.flink.jdbc.input.TypeInformationMap;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
@@ -56,16 +58,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class JdbcSource implements FlinkBatchSource {
 
@@ -74,19 +74,11 @@ public class JdbcSource implements FlinkBatchSource {
     private static final int DEFAULT_FETCH_SIZE = 10000;
 
     private Config config;
-    private String tableName;
-    private String driverName;
-    private String dbUrl;
-    private String username;
     private String password;
     private int fetchSize = DEFAULT_FETCH_SIZE;
     private int parallelism = -1;
-    private Set<String> fields;
     private Map<String, TypeInformation<?>> tableFieldInfo;
 
-    private static final Pattern COMPILE = Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+)(.*)",
-            Pattern.CASE_INSENSITIVE);
-
     private JdbcInputFormat jdbcInputFormat;
 
     @Override
@@ -115,13 +107,11 @@ public class JdbcSource implements FlinkBatchSource {
 
     @Override
     public void prepare(FlinkEnvironment env) {
-        driverName = config.getString(DRIVER);
-        dbUrl = config.getString(URL);
-        username = config.getString(USERNAME);
+        String driverName = config.getString(DRIVER);
+        String dbUrl = config.getString(URL);
+        String username = config.getString(USERNAME);
         String query = config.getString(QUERY);
-        Tuple2<String, Set<String>> tableNameAndFields = getTableNameAndFields(COMPILE, query);
-        tableName = tableNameAndFields.f0;
-        fields = tableNameAndFields.f1;
+
         if (config.hasPath(PASSWORD)) {
             password = config.getString(PASSWORD);
         }
@@ -140,26 +130,26 @@ public class JdbcSource implements FlinkBatchSource {
         }
 
         try (Connection connection = DriverManager.getConnection(dbUrl, username, password)) {
-            tableFieldInfo = initTableField(connection);
+            tableFieldInfo = initTableField(connection, query);
             RowTypeInfo rowTypeInfo = getRowTypeInfo();
             JdbcInputFormat.JdbcInputFormatBuilder builder = JdbcInputFormat.buildFlinkJdbcInputFormat();
             if (config.hasPath(PARTITION_COLUMN)) {
-                if (!tableFieldInfo.containsKey(config.getString(PARTITION_COLUMN))) {
-                    throw new IllegalArgumentException(String.format("field %s not contain in table %s",
-                            config.getString(PARTITION_COLUMN), tableName));
+                String partitionColumn = config.getString(PARTITION_COLUMN);
+                if (!tableFieldInfo.containsKey(partitionColumn)) {
+                    throw new IllegalArgumentException(String.format("field %s not contain in query sql %s",
+                            partitionColumn, query));
                 }
-                if (!isNumericType(rowTypeInfo.getTypeAt(config.getString(PARTITION_COLUMN)))) {
-                    throw new IllegalArgumentException(String.format("%s is not numeric type", PARTITION_COLUMN));
+                if (!isNumericType(rowTypeInfo.getTypeAt(partitionColumn))) {
+                    throw new IllegalArgumentException(String.format("%s is not numeric type", partitionColumn));
                 }
                 JdbcParameterValuesProvider jdbcParameterValuesProvider =
-                        initPartition(config.getString(PARTITION_COLUMN), connection);
+                        initPartition(partitionColumn, connection, query);
                 builder.setParametersProvider(jdbcParameterValuesProvider);
-                query = extendPartitionQuerySql(query, config.getString(PARTITION_COLUMN));
+                query = String.format("SELECT * FROM (%s) tt where " + partitionColumn + " >= ? AND " + partitionColumn + " < ?", query);
             }
             builder.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
                     .setPassword(password).setQuery(query).setFetchSize(fetchSize)
                     .setRowTypeInfo(rowTypeInfo);
-
             jdbcInputFormat = builder.finish();
         } catch (SQLException e) {
             throw new RuntimeException("jdbc connection init failed.", e);
@@ -171,23 +161,7 @@ public class JdbcSource implements FlinkBatchSource {
         return "JdbcSource";
     }
 
-    private String extendPartitionQuerySql(String query, String column) {
-        Matcher matcher = COMPILE.matcher(query);
-        if (matcher.find()) {
-            String where = matcher.group(Integer.parseInt("3"));
-            if (where != null && where.trim().toLowerCase().startsWith("where")) {
-                // contain where
-                return query + " AND \"" + column + "\" BETWEEN ? AND ?";
-            } else {
-                // not contain where
-                return query + " WHERE \"" + column + "\" BETWEEN ? AND ?";
-            }
-        } else {
-            throw new IllegalArgumentException("sql statement format is incorrect :" + query);
-        }
-    }
-
-    private JdbcParameterValuesProvider initPartition(String columnName, Connection connection) throws SQLException {
+    private JdbcParameterValuesProvider initPartition(String columnName, Connection connection, String query) throws SQLException {
         long max = Long.MAX_VALUE;
         long min = Long.MIN_VALUE;
         if (config.hasPath(PARTITION_UPPER_BOUND) && config.hasPath(PARTITION_LOWER_BOUND)) {
@@ -196,7 +170,7 @@ public class JdbcSource implements FlinkBatchSource {
             return new JdbcNumericBetweenParametersProvider(min, max).ofBatchNum(parallelism * 2);
         }
         try (ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) " +
-                "FROM %s", columnName, columnName, tableName))) {
+                "FROM (%s) tt", columnName, columnName, query))) {
             if (rs.next()) {
                 max = config.hasPath(PARTITION_UPPER_BOUND) ? config.getLong(PARTITION_UPPER_BOUND) :
                         Long.parseLong(rs.getString(1));
@@ -212,59 +186,47 @@ public class JdbcSource implements FlinkBatchSource {
                 || type.equals(LONG_TYPE_INFO) || type.equals(BIG_INT_TYPE_INFO);
     }
 
-    private Map<String, TypeInformation<?>> initTableField(Connection connection) {
-        Map<String, TypeInformation<?>> map = new LinkedHashMap<>();
-
+    private Map<String, TypeInformation<?>> initTableField(Connection connection, String selectSql) {
         try {
-            TypeInformationMap informationMapping = getTypeInformationMap(driverName);
-            DatabaseMetaData metaData = connection.getMetaData();
-            ResultSet columns = metaData.getColumns(connection.getCatalog(), connection.getSchema(), tableName, "%");
-            while (columns.next()) {
-                String columnName = columns.getString("COLUMN_NAME");
-                String dataTypeName = columns.getString("TYPE_NAME");
-                if (fields == null || fields.contains(columnName)) {
-                    map.put(columnName, informationMapping.getInformation(dataTypeName));
-                }
+            String databaseDialect = connection.getMetaData().getDatabaseProductName();
+            PreparedStatement preparedStatement = connection.prepareStatement(selectSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            preparedStatement.setMaxRows(1);
+            ResultSetMetaData rsMeta = preparedStatement.getMetaData();
+            try {
+                return getRowInfo(rsMeta, databaseDialect);
+            } catch (SQLException e) {
+                ResultSet rs = preparedStatement.executeQuery();
+                return getRowInfo(rs.getMetaData(), databaseDialect);
             }
-        } catch (Exception e) {
+        } catch (SQLException e) {
             LOGGER.warn("get row type info exception", e);
         }
-        return map;
+        return new LinkedHashMap<>();
     }
 
-    private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex, String selectSql) {
-        Matcher matcher = regex.matcher(selectSql);
-        String tableName;
-        Set<String> fields = null;
-        if (matcher.find()) {
-            String var = matcher.group(1);
-            tableName = matcher.group(2);
-            if (!"*".equals(var.trim())) {
-                LinkedHashSet<String> vars = new LinkedHashSet<>();
-                String[] split = var.split(",");
-                for (String s : split) {
-                    vars.add(s.trim());
-                }
-                fields = vars;
+    private Map<String, TypeInformation<?>> getRowInfo(ResultSetMetaData rsMeta, String databaseDialect) throws SQLException {
+        Map<String, TypeInformation<?>> map = new LinkedHashMap<>();
+        if (rsMeta == null) {
+            throw new SQLException("No result set metadata available to resolver row info!");
+        }
+        TypeInformationMap informationMapping = getTypeInformationMap(databaseDialect);
+        for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
+            String columnName = rsMeta.getColumnLabel(i);
+            String columnTypeName = rsMeta.getColumnTypeName(i);
+            if (columnTypeName == null) {
+                throw new SQLException("Unsupported to get type info from result set metadata!");
             }
-            return new Tuple2<>(tableName, fields);
-        } else {
-            throw new IllegalArgumentException("can't find tableName and fields in sql :" + selectSql);
+            map.put(columnName, informationMapping.getInformation(columnTypeName));
         }
+        return map;
     }
 
     private RowTypeInfo getRowTypeInfo() {
         int size = tableFieldInfo.size();
-        if (fields != null && fields.size() > 0) {
-            size = fields.size();
-        } else {
-            fields = tableFieldInfo.keySet();
-        }
-
+        Set<String> fields = tableFieldInfo.keySet();
         TypeInformation<?>[] typeInformation = new TypeInformation<?>[size];
         String[] names = new String[size];
         int i = 0;
-
         for (String field : fields) {
             typeInformation[i] = tableFieldInfo.get(field);
             names[i] = field;
@@ -273,12 +235,13 @@ public class JdbcSource implements FlinkBatchSource {
         return new RowTypeInfo(typeInformation, names);
     }
 
-    private TypeInformationMap getTypeInformationMap(String driverName) {
-        driverName = driverName.toLowerCase();
-        if (driverName.contains("mysql")) {
+    private TypeInformationMap getTypeInformationMap(String databaseDialect) {
+        if (StringUtils.containsIgnoreCase(databaseDialect, "mysql")) {
             return new MysqlTypeInformationMap();
-        } else if (driverName.contains("postgresql")) {
+        } else if (StringUtils.containsIgnoreCase(databaseDialect, "postgresql")) {
             return new PostgresTypeInformationMap();
+        } else if (StringUtils.containsIgnoreCase(databaseDialect, "oracle")) {
+            return new OracleTypeInformationMap();
         } else {
             return new DefaultTypeInformationMap();
         }