You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 15:08:08 UTC
[flink] 01/02: [FLINK-29151][hive] Fix "SHOW CREATE TABLE" doesn't work for Hive dialect
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a5eec9945a3cdbd35b934508586803f470a3f2a
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 21 15:11:32 2022 +0800
[FLINK-29151][hive] Fix "SHOW CREATE TABLE" doesn't work for Hive dialect
This closes #20795
---
.../flink/table/catalog/hive/HiveCatalog.java | 28 +-
.../delegation/hive/HiveOperationExecutor.java | 47 +++-
.../delegation/hive/HiveShowTableUtils.java | 288 +++++++++++++++++++++
.../HiveLoadDataOperation.java | 2 +-
.../operations/HiveShowCreateTableOperation.java | 40 +++
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 18 +-
.../hive/parse/HiveParserLoadSemanticAnalyzer.java | 2 +-
.../flink/connectors/hive/HiveDialectITCase.java | 55 ++++
8 files changed, 465 insertions(+), 15 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 05ca11a6b41..b0860af8d3a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -714,17 +714,7 @@ public class HiveCatalog extends AbstractCatalog {
public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
- boolean isHiveTable;
- if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
- isHiveTable =
- !Boolean.parseBoolean(
- table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
- } else {
- isHiveTable =
- !table.getParameters().containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key())
- && !table.getParameters()
- .containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE);
- }
+ boolean isHiveTable = isHiveTable(table);
// for hive table, we add the connector property
if (isHiveTable) {
table.getParameters().put(CONNECTOR.key(), IDENTIFIER);
@@ -1828,6 +1818,22 @@ public class HiveCatalog extends AbstractCatalog {
return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key()));
}
+ @Internal
+ public static boolean isHiveTable(Table table) {
+ boolean isHiveTable;
+ if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
+ isHiveTable =
+ !Boolean.parseBoolean(
+ table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
+ } else {
+ isHiveTable =
+ !table.getParameters().containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key())
+ && !table.getParameters()
+ .containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE);
+ }
+ return isHiveTable;
+ }
+
@Internal
public void loadTable(
Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean isSrcLocal) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 664ce4e6e35..07353da1202 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
@@ -36,10 +37,14 @@ import org.apache.flink.table.operations.HiveSetOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
-import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveShowCreateTableOperation;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
import java.util.Collections;
import java.util.List;
@@ -69,6 +74,8 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
return executeHiveSetOperation((HiveSetOperation) operation);
} else if (operation instanceof HiveLoadDataOperation) {
return executeHiveLoadDataOperation((HiveLoadDataOperation) operation);
+ } else if (operation instanceof HiveShowCreateTableOperation) {
+ return executeShowCreateTableOperation((HiveShowCreateTableOperation) operation);
} else if (operation instanceof ExplainOperation) {
ExplainOperation explainOperation = (ExplainOperation) operation;
if (explainOperation.getChild() instanceof HiveLoadDataOperation) {
@@ -219,4 +226,42 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
.data(Collections.singletonList(Row.of(explanation)))
.build());
}
+
+ private Optional<TableResultInternal> executeShowCreateTableOperation(
+ HiveShowCreateTableOperation showCreateTableOperation) {
+ ObjectPath tablePath = showCreateTableOperation.getTablePath();
+ Catalog currentCatalog =
+ catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+ if (!(currentCatalog instanceof HiveCatalog)) {
+ throw new FlinkHiveException(
+ "Only support 'SHOW CREATE TABLE' when the current catalog is HiveCatalog in Hive dialect.");
+ }
+ HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog;
+ HiveConf hiveConf = hiveCatalog.getHiveConf();
+ Hive hive;
+ Table tbl;
+ try {
+ hive = Hive.get(hiveConf);
+ tbl = hive.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (HiveException e) {
+ throw new FlinkHiveException(String.format("Fail to get the table %s.", tablePath), e);
+ }
+
+ if (!HiveCatalog.isHiveTable(tbl.getTTable())) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The table %s to show isn't a Hive table,"
+ + " but 'SHOW CREATE TABLE' only supports Hive table currently.",
+ tablePath));
+ }
+
+ String showCreateTableString = HiveShowTableUtils.showCreateTable(tablePath, tbl);
+ TableResultInternal resultInternal =
+ TableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS)
+ .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
+ .data(Collections.singletonList(Row.of(showCreateTableString)))
+ .build();
+ return Optional.of(resultInternal);
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
new file mode 100644
index 00000000000..e0c6f36b720
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveShowTableUtils.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hive.common.util.HiveStringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+
+/** Utils for Hive's SHOW TABLE statement. */
+public class HiveShowTableUtils {
+
+ /** Construct the string for SHOW CREATE TABLE statement. Most of the logic is from Hive's. */
+ public static String showCreateTable(ObjectPath tablePath, Table tbl) {
+ boolean needsLocation;
+ String showCreateTableString;
+ List<String> duplicateProps = new ArrayList<>();
+ needsLocation = doesTableNeedLocation(tbl);
+ if (tbl.isView()) {
+ showCreateTableString =
+ "CREATE VIEW `" + tablePath + "` AS " + tbl.getViewExpandedText();
+ } else {
+ StringBuilder createTabStringBuilder = new StringBuilder();
+ // For cases where the table is temporary
+ String tblTemp = "";
+ if (tbl.isTemporary()) {
+ duplicateProps.add("TEMPORARY");
+ tblTemp = "TEMPORARY ";
+ }
+ // For cases where the table is external
+ String tblExternal = "";
+ if (tbl.getTableType() == TableType.EXTERNAL_TABLE) {
+ duplicateProps.add("EXTERNAL");
+ tblExternal = "EXTERNAL ";
+ }
+
+ createTabStringBuilder.append(
+ String.format(
+ "CREATE %s%sTABLE `%s`",
+ tblTemp, tblExternal, tablePath.getFullName()));
+
+ // Columns
+ String tblColumns;
+ List<FieldSchema> cols = tbl.getCols();
+ List<String> columns = new ArrayList<>();
+ for (FieldSchema col : cols) {
+ String columnDesc = " `" + col.getName() + "` " + col.getType();
+ if (col.getComment() != null) {
+ columnDesc =
+ columnDesc
+ + " COMMENT '"
+ + HiveStringUtils.escapeHiveCommand(col.getComment())
+ + "'";
+ }
+ columns.add(columnDesc);
+ }
+ tblColumns = StringUtils.join(columns, ", \n");
+ createTabStringBuilder.append(String.format("(\n%s)\n", tblColumns));
+
+ // Table comment
+ String tblComment;
+ String tabComment = tbl.getProperty("comment");
+ if (tabComment != null) {
+ duplicateProps.add("comment");
+ tblComment = "COMMENT '" + HiveStringUtils.escapeHiveCommand(tabComment) + "'";
+ createTabStringBuilder.append(String.format("%s\n", tblComment));
+ }
+
+ // Partitions
+ String tblPartitions = "";
+ List<FieldSchema> partKeys = tbl.getPartitionKeys();
+ if (partKeys.size() > 0) {
+ tblPartitions += "PARTITIONED BY ( \n";
+ List<String> partCols = new ArrayList<>();
+ for (FieldSchema partKey : partKeys) {
+ String partColDesc = " `" + partKey.getName() + "` " + partKey.getType();
+ if (partKey.getComment() != null) {
+ partColDesc =
+ partColDesc
+ + " COMMENT '"
+ + HiveStringUtils.escapeHiveCommand(partKey.getComment())
+ + "'";
+ }
+ partCols.add(partColDesc);
+ }
+ tblPartitions += StringUtils.join(partCols, ", \n");
+ tblPartitions += ")";
+ }
+ if (!tblPartitions.equals("")) {
+ createTabStringBuilder.append(String.format("%s\n", tblPartitions));
+ }
+
+ // Clusters (Buckets)
+ String tblSortBucket = "";
+ List<String> buckCols = tbl.getBucketCols();
+ if (buckCols.size() > 0) {
+ duplicateProps.add("SORTBUCKETCOLSPREFIX");
+ tblSortBucket += "CLUSTERED BY ( \n ";
+ tblSortBucket += StringUtils.join(buckCols, ", \n ");
+ tblSortBucket += ") \n";
+ List<Order> sortCols = tbl.getSortCols();
+ if (sortCols.size() > 0) {
+ tblSortBucket += "SORTED BY ( \n";
+ // Order
+ List<String> sortKeys = new ArrayList<String>();
+ for (Order sortCol : sortCols) {
+ String sortKeyDesc = " " + sortCol.getCol() + " ";
+ if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) {
+ sortKeyDesc = sortKeyDesc + "ASC";
+ } else if (sortCol.getOrder()
+ == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) {
+ sortKeyDesc = sortKeyDesc + "DESC";
+ }
+ sortKeys.add(sortKeyDesc);
+ }
+ tblSortBucket += StringUtils.join(sortKeys, ", \n");
+ tblSortBucket += ") \n";
+ }
+ tblSortBucket += "INTO " + tbl.getNumBuckets() + " BUCKETS";
+ createTabStringBuilder.append(String.format("%s\n", tblSortBucket));
+ }
+
+ // Skewed Info
+ StringBuilder tblSkewedInfo = new StringBuilder();
+ SkewedInfo skewedInfo = tbl.getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ tblSkewedInfo
+ .append("SKEWED BY (")
+ .append(StringUtils.join(skewedInfo.getSkewedColNames(), ","))
+ .append(")\n");
+ tblSkewedInfo.append(" ON (");
+ List<String> colValueList = new ArrayList<>();
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ colValueList.add("('" + StringUtils.join(colValues, "','") + "')");
+ }
+ tblSkewedInfo.append(StringUtils.join(colValueList, ",")).append(")");
+ if (tbl.isStoredAsSubDirectories()) {
+ tblSkewedInfo.append("\n STORED AS DIRECTORIES");
+ }
+ createTabStringBuilder.append(String.format("%s\n", tblSkewedInfo));
+ }
+
+ // Row format (SerDe)
+ StringBuilder tblRowFormat = new StringBuilder();
+ StorageDescriptor sd = tbl.getTTable().getSd();
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ tblRowFormat.append("ROW FORMAT SERDE \n");
+ tblRowFormat
+ .append(" '")
+ .append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+ .append("' \n");
+ if (tbl.getStorageHandler() == null) {
+ // If serialization.format property has the default value, it will not to be
+ // included in SERDE properties
+ if (MetaStoreUtils.DEFAULT_SERIALIZATION_FORMAT.equals(
+ serdeParams.get(serdeConstants.SERIALIZATION_FORMAT))) {
+ serdeParams.remove(serdeConstants.SERIALIZATION_FORMAT);
+ }
+ if (!serdeParams.isEmpty()) {
+ appendSerdeParams(tblRowFormat, serdeParams).append(" \n");
+ }
+ tblRowFormat
+ .append("STORED AS INPUTFORMAT \n '")
+ .append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+ .append("' \n");
+ tblRowFormat
+ .append("OUTPUTFORMAT \n '")
+ .append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+ .append("'");
+ } else {
+ duplicateProps.add(META_TABLE_STORAGE);
+ tblRowFormat
+ .append("STORED BY \n '")
+ .append(
+ HiveStringUtils.escapeHiveCommand(
+ tbl.getParameters().get(META_TABLE_STORAGE)))
+ .append("' \n");
+ // SerDe Properties
+ if (!serdeParams.isEmpty()) {
+ appendSerdeParams(tblRowFormat, serdeInfo.getParameters());
+ }
+ }
+ createTabStringBuilder.append(String.format("%s\n", tblRowFormat));
+
+ // table location
+ if (needsLocation) {
+ String tblLocation =
+ " '" + HiveStringUtils.escapeHiveCommand(sd.getLocation()) + "'";
+ createTabStringBuilder.append(String.format("LOCATION\n%s\n", tblLocation));
+ }
+
+ // Table properties
+ duplicateProps.addAll(
+ Arrays.stream(StatsSetupConst.TABLE_PARAMS_STATS_KEYS)
+ .collect(Collectors.toList()));
+ String tblProperties = propertiesToString(tbl.getParameters(), duplicateProps);
+ createTabStringBuilder.append(String.format("TBLPROPERTIES (\n%s)\n", tblProperties));
+ showCreateTableString = createTabStringBuilder.toString();
+ }
+ return showCreateTableString;
+ }
+
+ private static boolean doesTableNeedLocation(Table tbl) {
+ boolean retval = true;
+ if (tbl.getStorageHandler() != null) {
+ String sh = tbl.getStorageHandler().toString();
+ retval =
+ !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler")
+ && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID);
+ }
+ return retval;
+ }
+
+ private static String propertiesToString(Map<String, String> props, List<String> exclude) {
+ String propString = "";
+ if (!props.isEmpty()) {
+ Map<String, String> properties = new TreeMap<String, String>(props);
+ List<String> realProps = new ArrayList<String>();
+ for (String key : properties.keySet()) {
+ if (properties.get(key) != null && (exclude == null || !exclude.contains(key))) {
+ realProps.add(
+ " '"
+ + key
+ + "'='"
+ + HiveStringUtils.escapeHiveCommand(properties.get(key))
+ + "'");
+ }
+ }
+ propString += StringUtils.join(realProps, ", \n");
+ }
+ return propString;
+ }
+
+ private static StringBuilder appendSerdeParams(
+ StringBuilder builder, Map<String, String> serdeParam) {
+ serdeParam = new TreeMap<>(serdeParam);
+ builder.append("WITH SERDEPROPERTIES ( \n");
+ List<String> serdeCols = new ArrayList<>();
+ for (Map.Entry<String, String> entry : serdeParam.entrySet()) {
+ serdeCols.add(
+ " '"
+ + entry.getKey()
+ + "'='"
+ + HiveStringUtils.escapeHiveCommand(entry.getValue())
+ + "'");
+ }
+ builder.append(StringUtils.join(serdeCols, ", \n")).append(')');
+ return builder;
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
similarity index 99%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
index 339985336b5..3b53c041aa5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveLoadDataOperation.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.delegation.hive.operation;
+package org.apache.flink.table.planner.delegation.hive.operations;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.operations.Operation;
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java
new file mode 100644
index 00000000000..4c7048f41b9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operations/HiveShowCreateTableOperation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.operations;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to describe a SHOW CREATE TABLE statement. */
+public class HiveShowCreateTableOperation implements Operation {
+ private final ObjectPath tablePath;
+
+ public HiveShowCreateTableOperation(ObjectPath tablePath) {
+ this.tablePath = tablePath;
+ }
+
+ public ObjectPath getTablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("SHOW CREATE TABLE %s", tablePath.getFullName());
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index d104a1de0f3..7ca9bdb5cb7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -104,6 +104,7 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserRowResolver;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveShowCreateTableOperation;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
@@ -377,6 +378,9 @@ public class HiveParserDDLSemanticAnalyzer {
case HiveASTParser.TOK_DROPMACRO:
res = convertDropMacro(ast);
break;
+ case HiveASTParser.TOK_SHOW_CREATETABLE:
+ res = convertShowCreateTable(ast);
+ break;
case HiveASTParser.TOK_DESCFUNCTION:
case HiveASTParser.TOK_DESCDATABASE:
case HiveASTParser.TOK_TRUNCATETABLE:
@@ -413,7 +417,6 @@ public class HiveParserDDLSemanticAnalyzer {
case HiveASTParser.TOK_SHOW_TABLESTATUS:
case HiveASTParser.TOK_SHOW_TBLPROPERTIES:
case HiveASTParser.TOK_SHOWCONF:
- case HiveASTParser.TOK_SHOW_CREATETABLE:
default:
handleUnsupportedOperation(ast);
}
@@ -697,6 +700,19 @@ public class HiveParserDDLSemanticAnalyzer {
return new DropTempSystemFunctionOperation(macroName, ifExists);
}
+ private Operation convertShowCreateTable(HiveParserASTNode ast) throws SemanticException {
+ String[] qualTabName =
+ HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
+ (HiveParserASTNode) ast.getChild(0));
+ ObjectPath tablePath = new ObjectPath(qualTabName[0], qualTabName[1]);
+ Table table = getTable(tablePath);
+ if (table.getTableType() == TableType.INDEX_TABLE) {
+ throw new SemanticException(
+ ErrorMsg.SHOW_CREATETABLE_INDEX.getMsg(table + " has table type INDEX_TABLE"));
+ }
+ return new HiveShowCreateTableOperation(tablePath);
+ }
+
private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException {
Operation operation = null;
String[] qualified =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
index 81047f2e5d6..48913291703 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.delegation.hive.parse;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec;
-import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation;
import org.apache.flink.util.StringUtils;
import org.antlr.runtime.tree.Tree;
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 42e3a2ba908..14e32815dbd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1174,6 +1174,61 @@ public class HiveDialectITCase {
.hasMessage("ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
}
+ @Test
+ public void testShowCreateTable() throws Exception {
+ tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tableEnv.executeSql(
+ "create table t1(id BIGINT,\n"
+ + " name STRING) WITH (\n"
+ + " 'connector' = 'datagen' "
+ + ")");
+ tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tableEnv.executeSql(
+ "create table t2 (key string, value string) comment 'show create table' partitioned by (a string, b int)"
+ + " tblproperties ('k1' = 'v1')");
+
+ // should throw exception for show non-hive table
+ assertThatThrownBy(() -> tableEnv.executeSql("show create table t1"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "The table %s to show isn't a Hive table,"
+ + " but 'SHOW CREATE TABLE' only supports Hive table currently.",
+ "default.t1"));
+
+ // show hive table
+ String actualResult =
+ (String)
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("show create table t2").collect())
+ .get(0)
+ .getField(0);
+ Table table = hiveCatalog.getHiveTable(new ObjectPath("default", "t2"));
+ String expectLastDdlTime = table.getParameters().get("transient_lastDdlTime");
+ String expectedResult =
+ String.format(
+ "CREATE TABLE `default.t2`(\n"
+ + " `key` string, \n"
+ + " `value` string)\n"
+ + "COMMENT 'show create table'\n"
+ + "PARTITIONED BY ( \n"
+ + " `a` string, \n"
+ + " `b` int)\n"
+ + "ROW FORMAT SERDE \n"
+ + " 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \n"
+ + "STORED AS INPUTFORMAT \n"
+ + " 'org.apache.hadoop.mapred.TextInputFormat' \n"
+ + "OUTPUTFORMAT \n"
+ + " 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'\n"
+ + "LOCATION\n"
+ + " 'file:%s'\n"
+ + "TBLPROPERTIES (\n"
+ + " 'k1'='v1', \n"
+ + " 'transient_lastDdlTime'='%s')\n",
+ warehouse + "/t2", expectLastDdlTime);
+ assertThat(actualResult).isEqualTo(expectedResult);
+ }
+
@Test
public void testUnsupportedOperation() {
List<String> statements =