You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/26 11:10:10 UTC
[flink] branch release-1.16 updated: [FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 21444e5eec7 [FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)
21444e5eec7 is described below
commit 21444e5eec7ccf25a54ba0675105ab47a4096ac7
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Mon Sep 26 19:09:46 2022 +0800
[FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)
---
.../planner/delegation/hive/HiveShowTableUtils.java | 11 +++++++++--
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 18 ++++++++++++++----
.../flink/connectors/hive/HiveDialectITCase.java | 15 +++++++++++----
3 files changed, 34 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
index e0c6f36b720..e1f72658a3c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -47,6 +46,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_
/** Utils for Hive's SHOW TABLE statement. */
public class HiveShowTableUtils {
+ // the default serialization format.
+ // In Hive2, the value is
+ // org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT
+ // In Hive3, the value is org.apache.hadoop.hive.metastore.DEFAULT_SERIALIZATION_FORMAT.
+ // Both of them are "1", but exist in different class.
+ // so we port the value to here
+ private static final String DEFAULT_SERIALIZATION_FORMAT = "1";
+
/** Construct the string for SHOW CREATE TABLE statement. Most of the logic is from Hive's. */
public static String showCreateTable(ObjectPath tablePath, Table tbl) {
boolean needsLocation;
@@ -190,7 +197,7 @@ public class HiveShowTableUtils {
if (tbl.getStorageHandler() == null) {
// If serialization.format property has the default value, it will not to be
// included in SERDE properties
- if (MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT.equals(
+ if (DEFAULT_SERIALIZATION_FORMAT.equals(
serdeParams.get(serdeConstants.SERIALIZATION_FORMAT))) {
serdeParams.remove(serdeConstants.SERIALIZATION_FORMAT);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 7ca9bdb5cb7..3ef4f1ba879 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -705,14 +706,23 @@ public class HiveParserDDLSemanticAnalyzer {
HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
(HiveParserASTNode) ast.getChild(0));
ObjectPath tablePath = new ObjectPath(qualTabName[0], qualTabName[1]);
- Table table = getTable(tablePath);
- if (table.getTableType() == TableType.INDEX_TABLE) {
- throw new SemanticException(
- ErrorMsg.SHOW_CREATETABLE_INDEX.getMsg(table + " has table type INDEX_TABLE"));
+ if (!isHive310OrLater()) {
+ // before hive3, Hive will check the table type is index table or not
+ Table table = getTable(tablePath);
+ if (table.getTableType().name().equals("INDEX_TABLE")) {
+ throw new SemanticException(
+ String.format(
+ "SHOW CREATE TABLE does not support tables of type INDEX_TABLE.. %s has has table type INDEX_TABLE.",
+ tablePath));
+ }
}
return new HiveShowCreateTableOperation(tablePath);
}
+ private boolean isHive310OrLater() {
+ return HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V3_1_0) >= 0;
+ }
+
private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException {
Operation operation = null;
String[] qualified =
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index a878510f948..81487871c78 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1205,6 +1205,15 @@ public class HiveDialectITCase {
.getField(0);
Table table = hiveCatalog.getHiveTable(new ObjectPath("default", "t2"));
String expectLastDdlTime = table.getParameters().get("transient_lastDdlTime");
+ String expectedTableProperties =
+ String.format(
+ "%s 'k1'='v1', \n 'transient_lastDdlTime'='%s'",
+ // if it's hive 3.x, table properties should also contain
+ // 'bucketing_version'='2'
+ HiveVersionTestUtil.HIVE_310_OR_LATER
+ ? " 'bucketing_version'='2', \n"
+ : "",
+ expectLastDdlTime);
String expectedResult =
String.format(
"CREATE TABLE `default.t2`(\n"
@@ -1222,10 +1231,8 @@ public class HiveDialectITCase {
+ " 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'\n"
+ "LOCATION\n"
+ " 'file:%s'\n"
- + "TBLPROPERTIES (\n"
- + " 'k1'='v1', \n"
- + " 'transient_lastDdlTime'='%s')\n",
- warehouse + "/t2", expectLastDdlTime);
+ + "TBLPROPERTIES (\n%s)\n",
+ warehouse + "/t2", expectedTableProperties);
assertThat(actualResult).isEqualTo(expectedResult);
}