You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/21 17:48:00 UTC
[incubator-seatunnel] branch dev updated: support read hive by jdbc (#1926)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5c28ac5a support read hive by jdbc (#1926)
5c28ac5a is described below
commit 5c28ac5aca588c9ce3002500ce60a5096997166c
Author: 沫 <32...@njau.edu.cn>
AuthorDate: Sun May 22 01:47:55 2022 +0800
support read hive by jdbc (#1926)
Co-authored-by: zhoutao.tobeone <zh...@bytedance.com>
---
.../flink/jdbc/input/HiveTypeInformationMap.java | 77 ++++++++++++++++++++++
.../seatunnel/flink/jdbc/source/JdbcSource.java | 7 +-
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java
new file mode 100644
index 00000000..7e004764
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/input/HiveTypeInformationMap.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.jdbc.input;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.VOID_TYPE_INFO;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HiveTypeInformationMap implements TypeInformationMap {
+
+ private static final Map<String, TypeInformation<?>> INFORMATION_MAP = new HashMap<>();
+
+ //The following mapping relationship reference pages
+ //https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBCDataTypes
+ static {
+ //Numeric Data Types
+ INFORMATION_MAP.put("tinyint", INT_TYPE_INFO);
+ INFORMATION_MAP.put("smallint", SHORT_TYPE_INFO);
+ INFORMATION_MAP.put("int", INT_TYPE_INFO);
+ INFORMATION_MAP.put("bigint", LONG_TYPE_INFO);
+ INFORMATION_MAP.put("float", FLOAT_TYPE_INFO);
+ INFORMATION_MAP.put("double", DOUBLE_TYPE_INFO);
+ INFORMATION_MAP.put("decimal", BIG_DEC_TYPE_INFO);
+ INFORMATION_MAP.put("boolean", BOOLEAN_TYPE_INFO);
+
+ //String Data Types
+ INFORMATION_MAP.put("char", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("varchar", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("binary", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("string", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("void", VOID_TYPE_INFO);
+
+ //Date and Time Data Types
+ INFORMATION_MAP.put("date", SqlTimeTypeInfo.DATE);
+ INFORMATION_MAP.put("interval_day_time", SqlTimeTypeInfo.DATE);
+ INFORMATION_MAP.put("interval_year_month", SqlTimeTypeInfo.DATE);
+ INFORMATION_MAP.put("timestamp", SqlTimeTypeInfo.TIMESTAMP);
+
+ //Complex Data Types
+ INFORMATION_MAP.put("map", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("array", STRING_TYPE_INFO);
+ INFORMATION_MAP.put("struct", STRING_TYPE_INFO);
+ }
+
+ @Override
+ public TypeInformation<?> getInformation(String datatype) {
+ return INFORMATION_MAP.get(datatype);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index e7433f79..a8540f46 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.jdbc.input.DefaultTypeInformationMap;
+import org.apache.seatunnel.flink.jdbc.input.HiveTypeInformationMap;
import org.apache.seatunnel.flink.jdbc.input.JdbcInputFormat;
import org.apache.seatunnel.flink.jdbc.input.MysqlTypeInformationMap;
import org.apache.seatunnel.flink.jdbc.input.OracleTypeInformationMap;
@@ -194,10 +195,12 @@ public class JdbcSource implements FlinkBatchSource {
String databaseDialect = connection.getMetaData().getDatabaseProductName();
PreparedStatement preparedStatement = connection.prepareStatement(selectSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
preparedStatement.setMaxRows(1);
- ResultSetMetaData rsMeta = preparedStatement.getMetaData();
try {
+ //support PreparedStatement getMetaData()
+ ResultSetMetaData rsMeta = preparedStatement.getMetaData();
return getRowInfo(rsMeta, databaseDialect);
} catch (SQLException e) {
+ //not support PreparedStatement getMetaData() and use ResultSet getMetaData()
ResultSet rs = preparedStatement.executeQuery();
return getRowInfo(rs.getMetaData(), databaseDialect);
}
@@ -245,6 +248,8 @@ public class JdbcSource implements FlinkBatchSource {
return new PostgresTypeInformationMap();
} else if (StringUtils.containsIgnoreCase(databaseDialect, "oracle")) {
return new OracleTypeInformationMap();
+ } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")){
+ return new HiveTypeInformationMap();
} else {
return new DefaultTypeInformationMap();
}