You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 15:08:08 UTC

[flink] 01/02: [FLINK-29151][hive] Fix "SHOW CREATE TABLE" doesn't work for Hive dialect

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a5eec9945a3cdbd35b934508586803f470a3f2a
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 21 15:11:32 2022 +0800

    [FLINK-29151][hive] Fix "SHOW CREATE TABLE" doesn't work for Hive dialect
    
    This closes #20795
---
 .../flink/table/catalog/hive/HiveCatalog.java      |  28 +-
 .../delegation/hive/HiveOperationExecutor.java     |  47 +++-
 .../delegation/hive/HiveShowTableUtils.java        | 288 +++++++++++++++++++++
 .../HiveLoadDataOperation.java                     |   2 +-
 .../operations/HiveShowCreateTableOperation.java   |  40 +++
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  18 +-
 .../hive/parse/HiveParserLoadSemanticAnalyzer.java |   2 +-
 .../flink/connectors/hive/HiveDialectITCase.java   |  55 ++++
 8 files changed, 465 insertions(+), 15 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 05ca11a6b41..b0860af8d3a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -714,17 +714,7 @@ public class HiveCatalog extends AbstractCatalog {
     public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
         try {
             Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
-            boolean isHiveTable;
-            if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
-                isHiveTable =
-                        !Boolean.parseBoolean(
-                                table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
-            } else {
-                isHiveTable =
-                        !table.getParameters().containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key())
-                                && !table.getParameters()
-                                        .containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE);
-            }
+            boolean isHiveTable = isHiveTable(table);
             // for hive table, we add the connector property
             if (isHiveTable) {
                 table.getParameters().put(CONNECTOR.key(), IDENTIFIER);
@@ -1828,6 +1818,22 @@ public class HiveCatalog extends AbstractCatalog {
         return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key()));
     }
 
+    @Internal
+    public static boolean isHiveTable(Table table) {
+        boolean isHiveTable;
+        if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
+            isHiveTable =
+                    !Boolean.parseBoolean(
+                            table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
+        } else {
+            isHiveTable =
+                    !table.getParameters().containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key())
+                            && !table.getParameters()
+                                    .containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE);
+        }
+        return isHiveTable;
+    }
+
     @Internal
     public void loadTable(
             Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean isSrcLocal) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 664ce4e6e35..07353da1202 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
@@ -36,10 +37,14 @@ import org.apache.flink.table.operations.HiveSetOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
-import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveShowCreateTableOperation;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
 
 import java.util.Collections;
 import java.util.List;
@@ -69,6 +74,8 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
             return executeHiveSetOperation((HiveSetOperation) operation);
         } else if (operation instanceof HiveLoadDataOperation) {
             return executeHiveLoadDataOperation((HiveLoadDataOperation) operation);
+        } else if (operation instanceof HiveShowCreateTableOperation) {
+            return executeShowCreateTableOperation((HiveShowCreateTableOperation) operation);
         } else if (operation instanceof ExplainOperation) {
             ExplainOperation explainOperation = (ExplainOperation) operation;
             if (explainOperation.getChild() instanceof HiveLoadDataOperation) {
@@ -219,4 +226,42 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
                         .data(Collections.singletonList(Row.of(explanation)))
                         .build());
     }
+
+    private Optional<TableResultInternal> executeShowCreateTableOperation(
+            HiveShowCreateTableOperation showCreateTableOperation) {
+        ObjectPath tablePath = showCreateTableOperation.getTablePath();
+        Catalog currentCatalog =
+                catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+        if (!(currentCatalog instanceof HiveCatalog)) {
+            throw new FlinkHiveException(
+                    "Only support 'SHOW CREATE TABLE' when the current catalog is HiveCatalog in Hive dialect.");
+        }
+        HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog;
+        HiveConf hiveConf = hiveCatalog.getHiveConf();
+        Hive hive;
+        Table tbl;
+        try {
+            hive = Hive.get(hiveConf);
+            tbl = hive.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+        } catch (HiveException e) {
+            throw new FlinkHiveException(String.format("Fail to get the table %s.", tablePath), e);
+        }
+
+        if (!HiveCatalog.isHiveTable(tbl.getTTable())) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The table %s to show isn't a Hive table,"
+                                    + " but 'SHOW CREATE TABLE' only supports Hive table currently.",
+                            tablePath));
+        }
+
+        String showCreateTableString = HiveShowTableUtils.showCreateTable(tablePath, tbl);
+        TableResultInternal resultInternal =
+                TableResultImpl.builder()
+                        .resultKind(ResultKind.SUCCESS)
+                        .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
+                        .data(Collections.singletonList(Row.of(showCreateTableString)))
+                        .build();
+        return Optional.of(resultInternal);
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
new file mode 100644
index 00000000000..e0c6f36b720
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hive.common.util.HiveStringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+
+/** Utils for Hive's SHOW TABLE statement. */
+public class HiveShowTableUtils {
+
+    /** Construct the string for SHOW CREATE TABLE statement. Most of the logic is from Hive's. */
+    public static String showCreateTable(ObjectPath tablePath, Table tbl) {
+        boolean needsLocation;
+        String showCreateTableString;
+        List<String> duplicateProps = new ArrayList<>();
+        needsLocation = doesTableNeedLocation(tbl);
+        if (tbl.isView()) {
+            showCreateTableString =
+                    "CREATE VIEW `" + tablePath + "` AS " + tbl.getViewExpandedText();
+        } else {
+            StringBuilder createTabStringBuilder = new StringBuilder();
+            // For cases where the table is temporary
+            String tblTemp = "";
+            if (tbl.isTemporary()) {
+                duplicateProps.add("TEMPORARY");
+                tblTemp = "TEMPORARY ";
+            }
+            // For cases where the table is external
+            String tblExternal = "";
+            if (tbl.getTableType() == TableType.EXTERNAL_TABLE) {
+                duplicateProps.add("EXTERNAL");
+                tblExternal = "EXTERNAL ";
+            }
+
+            createTabStringBuilder.append(
+                    String.format(
+                            "CREATE %s%sTABLE `%s`",
+                            tblTemp, tblExternal, tablePath.getFullName()));
+
+            // Columns
+            String tblColumns;
+            List<FieldSchema> cols = tbl.getCols();
+            List<String> columns = new ArrayList<>();
+            for (FieldSchema col : cols) {
+                String columnDesc = "  `" + col.getName() + "` " + col.getType();
+                if (col.getComment() != null) {
+                    columnDesc =
+                            columnDesc
+                                    + " COMMENT '"
+                                    + HiveStringUtils.escapeHiveCommand(col.getComment())
+                                    + "'";
+                }
+                columns.add(columnDesc);
+            }
+            tblColumns = StringUtils.join(columns, ", \n");
+            createTabStringBuilder.append(String.format("(\n%s)\n", tblColumns));
+
+            // Table comment
+            String tblComment;
+            String tabComment = tbl.getProperty("comment");
+            if (tabComment != null) {
+                duplicateProps.add("comment");
+                tblComment = "COMMENT '" + HiveStringUtils.escapeHiveCommand(tabComment) + "'";
+                createTabStringBuilder.append(String.format("%s\n", tblComment));
+            }
+
+            // Partitions
+            String tblPartitions = "";
+            List<FieldSchema> partKeys = tbl.getPartitionKeys();
+            if (partKeys.size() > 0) {
+                tblPartitions += "PARTITIONED BY ( \n";
+                List<String> partCols = new ArrayList<>();
+                for (FieldSchema partKey : partKeys) {
+                    String partColDesc = "  `" + partKey.getName() + "` " + partKey.getType();
+                    if (partKey.getComment() != null) {
+                        partColDesc =
+                                partColDesc
+                                        + " COMMENT '"
+                                        + HiveStringUtils.escapeHiveCommand(partKey.getComment())
+                                        + "'";
+                    }
+                    partCols.add(partColDesc);
+                }
+                tblPartitions += StringUtils.join(partCols, ", \n");
+                tblPartitions += ")";
+            }
+            if (!tblPartitions.equals("")) {
+                createTabStringBuilder.append(String.format("%s\n", tblPartitions));
+            }
+
+            // Clusters (Buckets)
+            String tblSortBucket = "";
+            List<String> buckCols = tbl.getBucketCols();
+            if (buckCols.size() > 0) {
+                duplicateProps.add("SORTBUCKETCOLSPREFIX");
+                tblSortBucket += "CLUSTERED BY ( \n  ";
+                tblSortBucket += StringUtils.join(buckCols, ", \n  ");
+                tblSortBucket += ") \n";
+                List<Order> sortCols = tbl.getSortCols();
+                if (sortCols.size() > 0) {
+                    tblSortBucket += "SORTED BY ( \n";
+                    // Order
+                    List<String> sortKeys = new ArrayList<String>();
+                    for (Order sortCol : sortCols) {
+                        String sortKeyDesc = "  " + sortCol.getCol() + " ";
+                        if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) {
+                            sortKeyDesc = sortKeyDesc + "ASC";
+                        } else if (sortCol.getOrder()
+                                == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) {
+                            sortKeyDesc = sortKeyDesc + "DESC";
+                        }
+                        sortKeys.add(sortKeyDesc);
+                    }
+                    tblSortBucket += StringUtils.join(sortKeys, ", \n");
+                    tblSortBucket += ") \n";
+                }
+                tblSortBucket += "INTO " + tbl.getNumBuckets() + " BUCKETS";
+                createTabStringBuilder.append(String.format("%s\n", tblSortBucket));
+            }
+
+            // Skewed Info
+            StringBuilder tblSkewedInfo = new StringBuilder();
+            SkewedInfo skewedInfo = tbl.getSkewedInfo();
+            if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+                tblSkewedInfo
+                        .append("SKEWED BY (")
+                        .append(StringUtils.join(skewedInfo.getSkewedColNames(), ","))
+                        .append(")\n");
+                tblSkewedInfo.append("  ON (");
+                List<String> colValueList = new ArrayList<>();
+                for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+                    colValueList.add("('" + StringUtils.join(colValues, "','") + "')");
+                }
+                tblSkewedInfo.append(StringUtils.join(colValueList, ",")).append(")");
+                if (tbl.isStoredAsSubDirectories()) {
+                    tblSkewedInfo.append("\n  STORED AS DIRECTORIES");
+                }
+                createTabStringBuilder.append(String.format("%s\n", tblSkewedInfo));
+            }
+
+            // Row format (SerDe)
+            StringBuilder tblRowFormat = new StringBuilder();
+            StorageDescriptor sd = tbl.getTTable().getSd();
+            SerDeInfo serdeInfo = sd.getSerdeInfo();
+            Map<String, String> serdeParams = serdeInfo.getParameters();
+            tblRowFormat.append("ROW FORMAT SERDE \n");
+            tblRowFormat
+                    .append("  '")
+                    .append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+                    .append("' \n");
+            if (tbl.getStorageHandler() == null) {
+                // If serialization.format property has the default value, it will not to be
+                // included in SERDE properties
+                if (MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT.equals(
+                        serdeParams.get(serdeConstants.SERIALIZATION_FORMAT))) {
+                    serdeParams.remove(serdeConstants.SERIALIZATION_FORMAT);
+                }
+                if (!serdeParams.isEmpty()) {
+                    appendSerdeParams(tblRowFormat, serdeParams).append(" \n");
+                }
+                tblRowFormat
+                        .append("STORED AS INPUTFORMAT \n  '")
+                        .append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+                        .append("' \n");
+                tblRowFormat
+                        .append("OUTPUTFORMAT \n  '")
+                        .append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+                        .append("'");
+            } else {
+                duplicateProps.add(META_TABLE_STORAGE);
+                tblRowFormat
+                        .append("STORED BY \n  '")
+                        .append(
+                                HiveStringUtils.escapeHiveCommand(
+                                        tbl.getParameters().get(META_TABLE_STORAGE)))
+                        .append("' \n");
+                // SerDe Properties
+                if (!serdeParams.isEmpty()) {
+                    appendSerdeParams(tblRowFormat, serdeInfo.getParameters());
+                }
+            }
+            createTabStringBuilder.append(String.format("%s\n", tblRowFormat));
+
+            // table location
+            if (needsLocation) {
+                String tblLocation =
+                        "  '" + HiveStringUtils.escapeHiveCommand(sd.getLocation()) + "'";
+                createTabStringBuilder.append(String.format("LOCATION\n%s\n", tblLocation));
+            }
+
+            // Table properties
+            duplicateProps.addAll(
+                    Arrays.stream(StatsSetupConst.TABLE_PARAMS_STATS_KEYS)
+                            .collect(Collectors.toList()));
+            String tblProperties = propertiesToString(tbl.getParameters(), duplicateProps);
+            createTabStringBuilder.append(String.format("TBLPROPERTIES (\n%s)\n", tblProperties));
+            showCreateTableString = createTabStringBuilder.toString();
+        }
+        return showCreateTableString;
+    }
+
+    private static boolean doesTableNeedLocation(Table tbl) {
+        boolean retval = true;
+        if (tbl.getStorageHandler() != null) {
+            String sh = tbl.getStorageHandler().toString();
+            retval =
+                    !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler")
+                            && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID);
+        }
+        return retval;
+    }
+
+    private static String propertiesToString(Map<String, String> props, List<String> exclude) {
+        String propString = "";
+        if (!props.isEmpty()) {
+            Map<String, String> properties = new TreeMap<String, String>(props);
+            List<String> realProps = new ArrayList<String>();
+            for (String key : properties.keySet()) {
+                if (properties.get(key) != null && (exclude == null || !exclude.contains(key))) {
+                    realProps.add(
+                            "  '"
+                                    + key
+                                    + "'='"
+                                    + HiveStringUtils.escapeHiveCommand(properties.get(key))
+                                    + "'");
+                }
+            }
+            propString += StringUtils.join(realProps, ", \n");
+        }
+        return propString;
+    }
+
+    private static StringBuilder appendSerdeParams(
+            StringBuilder builder, Map<String, String> serdeParam) {
+        serdeParam = new TreeMap<>(serdeParam);
+        builder.append("WITH SERDEPROPERTIES ( \n");
+        List<String> serdeCols = new ArrayList<>();
+        for (Map.Entry<String, String> entry : serdeParam.entrySet()) {
+            serdeCols.add(
+                    "  '"
+                            + entry.getKey()
+                            + "'='"
+                            + HiveStringUtils.escapeHiveCommand(entry.getValue())
+                            + "'");
+        }
+        builder.append(StringUtils.join(serdeCols, ", \n")).append(')');
+        return builder;
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
similarity index 99%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
index 339985336b5..3b53c041aa5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.delegation.hive.operation;
+package org.apache.flink.table.planner.delegation.hive.operations;
 
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.operations.Operation;
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java
new file mode 100644
index 00000000000..4c7048f41b9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.operations;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to describe a SHOW CREATE TABLE statement. */
+public class HiveShowCreateTableOperation implements Operation {
+    private final ObjectPath tablePath;
+
+    public HiveShowCreateTableOperation(ObjectPath tablePath) {
+        this.tablePath = tablePath;
+    }
+
+    public ObjectPath getTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("SHOW CREATE TABLE %s", tablePath.getFullName());
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index d104a1de0f3..7ca9bdb5cb7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -104,6 +104,7 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserRowResolver;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveShowCreateTableOperation;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
@@ -377,6 +378,9 @@ public class HiveParserDDLSemanticAnalyzer {
             case HiveASTParser.TOK_DROPMACRO:
                 res = convertDropMacro(ast);
                 break;
+            case HiveASTParser.TOK_SHOW_CREATETABLE:
+                res = convertShowCreateTable(ast);
+                break;
             case HiveASTParser.TOK_DESCFUNCTION:
             case HiveASTParser.TOK_DESCDATABASE:
             case HiveASTParser.TOK_TRUNCATETABLE:
@@ -413,7 +417,6 @@ public class HiveParserDDLSemanticAnalyzer {
             case HiveASTParser.TOK_SHOW_TABLESTATUS:
             case HiveASTParser.TOK_SHOW_TBLPROPERTIES:
             case HiveASTParser.TOK_SHOWCONF:
-            case HiveASTParser.TOK_SHOW_CREATETABLE:
             default:
                 handleUnsupportedOperation(ast);
         }
@@ -697,6 +700,19 @@ public class HiveParserDDLSemanticAnalyzer {
         return new DropTempSystemFunctionOperation(macroName, ifExists);
     }
 
+    private Operation convertShowCreateTable(HiveParserASTNode ast) throws SemanticException {
+        String[] qualTabName =
+                HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
+                        (HiveParserASTNode) ast.getChild(0));
+        ObjectPath tablePath = new ObjectPath(qualTabName[0], qualTabName[1]);
+        Table table = getTable(tablePath);
+        if (table.getTableType() == TableType.INDEX_TABLE) {
+            throw new SemanticException(
+                    ErrorMsg.SHOW_CREATETABLE_INDEX.getMsg(table + " has table type INDEX_TABLE"));
+        }
+        return new HiveShowCreateTableOperation(tablePath);
+    }
+
     private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException {
         Operation operation = null;
         String[] qualified =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
index 81047f2e5d6..48913291703 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.delegation.hive.parse;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec;
-import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation;
 import org.apache.flink.util.StringUtils;
 
 import org.antlr.runtime.tree.Tree;
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 42e3a2ba908..14e32815dbd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1174,6 +1174,61 @@ public class HiveDialectITCase {
                 .hasMessage("ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
     }
 
+    @Test
+    public void testShowCreateTable() throws Exception {
+        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tableEnv.executeSql(
+                "create table t1(id BIGINT,\n"
+                        + "  name STRING) WITH (\n"
+                        + "  'connector' = 'datagen' "
+                        + ")");
+        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tableEnv.executeSql(
+                "create table t2 (key string, value string) comment 'show create table' partitioned by (a string, b int)"
+                        + " tblproperties ('k1' = 'v1')");
+
+        // should throw exception for show non-hive table
+        assertThatThrownBy(() -> tableEnv.executeSql("show create table t1"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        String.format(
+                                "The table %s to show isn't a Hive table,"
+                                        + " but 'SHOW CREATE TABLE' only supports Hive table currently.",
+                                "default.t1"));
+
+        // show hive table
+        String actualResult =
+                (String)
+                        CollectionUtil.iteratorToList(
+                                        tableEnv.executeSql("show create table t2").collect())
+                                .get(0)
+                                .getField(0);
+        Table table = hiveCatalog.getHiveTable(new ObjectPath("default", "t2"));
+        String expectLastDdlTime = table.getParameters().get("transient_lastDdlTime");
+        String expectedResult =
+                String.format(
+                        "CREATE TABLE `default.t2`(\n"
+                                + "  `key` string, \n"
+                                + "  `value` string)\n"
+                                + "COMMENT 'show create table'\n"
+                                + "PARTITIONED BY ( \n"
+                                + "  `a` string, \n"
+                                + "  `b` int)\n"
+                                + "ROW FORMAT SERDE \n"
+                                + "  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \n"
+                                + "STORED AS INPUTFORMAT \n"
+                                + "  'org.apache.hadoop.mapred.TextInputFormat' \n"
+                                + "OUTPUTFORMAT \n"
+                                + "  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'\n"
+                                + "LOCATION\n"
+                                + "  'file:%s'\n"
+                                + "TBLPROPERTIES (\n"
+                                + "  'k1'='v1', \n"
+                                + "  'transient_lastDdlTime'='%s')\n",
+                        warehouse + "/t2", expectLastDdlTime);
+        assertThat(actualResult).isEqualTo(expectedResult);
+    }
+
     @Test
     public void testUnsupportedOperation() {
         List<String> statements =