You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/26 11:10:10 UTC

[flink] branch release-1.16 updated: [FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 21444e5eec7 [FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)
21444e5eec7 is described below

commit 21444e5eec7ccf25a54ba0675105ab47a4096ac7
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Mon Sep 26 19:09:46 2022 +0800

    [FLINK-29386][hive] Fix failed to compile flink-connector-hive in hive3 profile (#20882) (#20900)
---
 .../planner/delegation/hive/HiveShowTableUtils.java    | 11 +++++++++--
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java      | 18 ++++++++++++++----
 .../flink/connectors/hive/HiveDialectITCase.java       | 15 +++++++++++----
 3 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
index e0c6f36b720..e1f72658a3c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -47,6 +46,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_
 /** Utils for Hive's SHOW TABLE statement. */
 public class HiveShowTableUtils {
 
+    // the default serialization format.
+    // In Hive2, the value is
+    // org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT
+    // In Hive3, the value is org.apache.hadoop.hive.metastore.DEFAULT_SERIALIZATION_FORMAT.
+    // Both of them are "1", but exist in different class.
+    // so we port the value to here
+    private static final String DEFAULT_SERIALIZATION_FORMAT = "1";
+
     /** Construct the string for SHOW CREATE TABLE statement. Most of the logic is from Hive's. */
     public static String showCreateTable(ObjectPath tablePath, Table tbl) {
         boolean needsLocation;
@@ -190,7 +197,7 @@ public class HiveShowTableUtils {
             if (tbl.getStorageHandler() == null) {
                 // If serialization.format property has the default value, it will not to be
                 // included in SERDE properties
-                if (MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT.equals(
+                if (DEFAULT_SERIALIZATION_FORMAT.equals(
                         serdeParams.get(serdeConstants.SERIALIZATION_FORMAT))) {
                     serdeParams.remove(serdeConstants.SERIALIZATION_FORMAT);
                 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 7ca9bdb5cb7..3ef4f1ba879 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -705,14 +706,23 @@ public class HiveParserDDLSemanticAnalyzer {
                 HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
                         (HiveParserASTNode) ast.getChild(0));
         ObjectPath tablePath = new ObjectPath(qualTabName[0], qualTabName[1]);
-        Table table = getTable(tablePath);
-        if (table.getTableType() == TableType.INDEX_TABLE) {
-            throw new SemanticException(
-                    ErrorMsg.SHOW_CREATETABLE_INDEX.getMsg(table + " has table type INDEX_TABLE"));
+        if (!isHive310OrLater()) {
+            // before hive3, Hive will check the table type is index table or not
+            Table table = getTable(tablePath);
+            if (table.getTableType().name().equals("INDEX_TABLE")) {
+                throw new SemanticException(
+                        String.format(
+                                "SHOW CREATE TABLE does not support tables of type INDEX_TABLE.. %s has has table type INDEX_TABLE.",
+                                tablePath));
+            }
         }
         return new HiveShowCreateTableOperation(tablePath);
     }
 
+    private boolean isHive310OrLater() {
+        return HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V3_1_0) >= 0;
+    }
+
     private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException {
         Operation operation = null;
         String[] qualified =
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index a878510f948..81487871c78 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1205,6 +1205,15 @@ public class HiveDialectITCase {
                                 .getField(0);
         Table table = hiveCatalog.getHiveTable(new ObjectPath("default", "t2"));
         String expectLastDdlTime = table.getParameters().get("transient_lastDdlTime");
+        String expectedTableProperties =
+                String.format(
+                        "%s  'k1'='v1', \n  'transient_lastDdlTime'='%s'",
+                        // if it's hive 3.x, table properties should also contain
+                        // 'bucketing_version'='2'
+                        HiveVersionTestUtil.HIVE_310_OR_LATER
+                                ? "  'bucketing_version'='2', \n"
+                                : "",
+                        expectLastDdlTime);
         String expectedResult =
                 String.format(
                         "CREATE TABLE `default.t2`(\n"
@@ -1222,10 +1231,8 @@ public class HiveDialectITCase {
                                 + "  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'\n"
                                 + "LOCATION\n"
                                 + "  'file:%s'\n"
-                                + "TBLPROPERTIES (\n"
-                                + "  'k1'='v1', \n"
-                                + "  'transient_lastDdlTime'='%s')\n",
-                        warehouse + "/t2", expectLastDdlTime);
+                                + "TBLPROPERTIES (\n%s)\n",
+                        warehouse + "/t2", expectedTableProperties);
         assertThat(actualResult).isEqualTo(expectedResult);
     }