You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/21 17:48:00 UTC

[incubator-seatunnel] branch dev updated: support read hive by jdbc (#1926)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5c28ac5a support read hive by jdbc (#1926)
5c28ac5a is described below

commit 5c28ac5aca588c9ce3002500ce60a5096997166c
Author: 沫 <32...@njau.edu.cn>
AuthorDate: Sun May 22 01:47:55 2022 +0800

    support read hive by jdbc (#1926)
    
    Co-authored-by: zhoutao.tobeone <zh...@bytedance.com>
---
 .../flink/jdbc/input/HiveTypeInformationMap.java   | 77 ++++++++++++++++++++++
 .../seatunnel/flink/jdbc/source/JdbcSource.java    |  7 +-
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java
new file mode 100644
index 00000000..7e004764
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.jdbc.input;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.VOID_TYPE_INFO;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HiveTypeInformationMap implements TypeInformationMap {
+
+    private static final Map<String, TypeInformation<?>> INFORMATION_MAP = new HashMap<>();
+
+    //The following mapping relationship reference pages
+    //https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBCDataTypes
+    static {
+        //Numeric Data Types
+        INFORMATION_MAP.put("tinyint", INT_TYPE_INFO);
+        INFORMATION_MAP.put("smallint", SHORT_TYPE_INFO);
+        INFORMATION_MAP.put("int", INT_TYPE_INFO);
+        INFORMATION_MAP.put("bigint", LONG_TYPE_INFO);
+        INFORMATION_MAP.put("float", FLOAT_TYPE_INFO);
+        INFORMATION_MAP.put("double", DOUBLE_TYPE_INFO);
+        INFORMATION_MAP.put("decimal", BIG_DEC_TYPE_INFO);
+        INFORMATION_MAP.put("boolean", BOOLEAN_TYPE_INFO);
+
+        //String Data Types
+        INFORMATION_MAP.put("char", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("varchar", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("binary", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("string", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("void", VOID_TYPE_INFO);
+
+        //Date and Time Data Types
+        INFORMATION_MAP.put("date", SqlTimeTypeInfo.DATE);
+        INFORMATION_MAP.put("interval_day_time", SqlTimeTypeInfo.DATE);
+        INFORMATION_MAP.put("interval_year_month", SqlTimeTypeInfo.DATE);
+        INFORMATION_MAP.put("timestamp", SqlTimeTypeInfo.TIMESTAMP);
+
+        //Complex Data Types
+        INFORMATION_MAP.put("map", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("array", STRING_TYPE_INFO);
+        INFORMATION_MAP.put("struct", STRING_TYPE_INFO);
+    }
+
+    @Override
+    public TypeInformation<?> getInformation(String datatype) {
+        return INFORMATION_MAP.get(datatype);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index e7433f79..a8540f46 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.flink.jdbc.input.DefaultTypeInformationMap;
+import org.apache.seatunnel.flink.jdbc.input.HiveTypeInformationMap;
 import org.apache.seatunnel.flink.jdbc.input.JdbcInputFormat;
 import org.apache.seatunnel.flink.jdbc.input.MysqlTypeInformationMap;
 import org.apache.seatunnel.flink.jdbc.input.OracleTypeInformationMap;
@@ -194,10 +195,12 @@ public class JdbcSource implements FlinkBatchSource {
             String databaseDialect = connection.getMetaData().getDatabaseProductName();
             PreparedStatement preparedStatement = connection.prepareStatement(selectSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
             preparedStatement.setMaxRows(1);
-            ResultSetMetaData rsMeta = preparedStatement.getMetaData();
             try {
+                //support PreparedStatement getMetaData()
+                ResultSetMetaData rsMeta = preparedStatement.getMetaData();
                 return getRowInfo(rsMeta, databaseDialect);
             } catch (SQLException e) {
+                //not support PreparedStatement getMetaData() and use ResultSet getMetaData()
                 ResultSet rs = preparedStatement.executeQuery();
                 return getRowInfo(rs.getMetaData(), databaseDialect);
             }
@@ -245,6 +248,8 @@ public class JdbcSource implements FlinkBatchSource {
             return new PostgresTypeInformationMap();
         } else if (StringUtils.containsIgnoreCase(databaseDialect, "oracle")) {
             return new OracleTypeInformationMap();
+        } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")){
+            return new HiveTypeInformationMap();
         } else {
             return new DefaultTypeInformationMap();
         }