You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/25 09:07:19 UTC
[impala] branch master updated: IMPALA-3119: DDL support for bucketed tables
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 2733d039a IMPALA-3119: DDL support for bucketed tables
2733d039a is described below
commit 2733d039ad4a830a1ea34c1a75d2b666788e39a9
Author: xiabaike <xi...@163.com>
AuthorDate: Sun Sep 25 03:02:27 2022 +0000
IMPALA-3119: DDL support for bucketed tables
Add syntactic support for creating bucketed table.
The specific syntax in the create table statement is as follows:
[CLUSTERED BY (column[, column ...]) [SORT BY (column[, column ...])]
INTO 24 BUCKETS]
Example:
CREATE TABLE tbl (i int COMMENT 'hello', s string)
CLUSTERED BY (i) INTO 24 BUCKETS;
CREATE TABLE tbl (i int COMMENT 'hello', s string)
CLUSTERED BY (i) SORT BY (s) INTO 24 BUCKETS;
Instructions:
1. The bucket partitioning algorithm is the hash function used
in Hive's bucketed tables;
2. Create Bucketed Table statements currently don't support Kudu and
Iceberg tables;
3. In the current version, alter operations(add/drop/change/replace
columns) on bucketed tables are not supported;
4. Support dropping bucketed table;
This COMMIT is the first subtask of IMPALA-3118.
Change-Id: I919b4d4139bc3a7784fa6fdb6f064e25666d548e
Reviewed-on: http://gerrit.cloudera.org:8080/19055
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
common/thrift/CatalogObjects.thrift | 19 ++++++
common/thrift/JniCatalog.thrift | 5 +-
fe/src/main/cup/sql-parser.cup | 39 ++++++++++--
.../impala/analysis/CreateTableLikeFileStmt.java | 2 +-
.../apache/impala/analysis/CreateTableStmt.java | 3 +
.../impala/analysis/DropTableOrViewStmt.java | 5 +-
.../java/org/apache/impala/analysis/TableDef.java | 72 +++++++++++++++++++++-
.../org/apache/impala/analysis/ToSqlUtils.java | 25 ++++++--
.../apache/impala/service/CatalogOpExecutor.java | 14 +++++
.../java/org/apache/impala/util/BucketUtils.java | 41 ++++++++++++
fe/src/main/jflex/sql-scanner.flex | 2 +
.../org/apache/impala/analysis/AnalyzeDDLTest.java | 51 +++++++++++++++
.../org/apache/impala/analysis/AnalyzerTest.java | 2 +-
.../org/apache/impala/analysis/ParserTest.java | 32 ++++++++--
.../java/org/apache/impala/analysis/ToSqlTest.java | 18 ++++++
.../queries/QueryTest/create-table.test | 51 +++++++++++++++
.../queries/QueryTest/show-create-table.test | 21 +++++++
tests/metadata/test_show_create_table.py | 2 +-
18 files changed, 380 insertions(+), 24 deletions(-)
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 846baa0dc..b5a7a0b4f 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -152,6 +152,15 @@ enum TIcebergPartitionTransformType {
VOID = 7
}
+// Data distribution method of bucketed table.
+// (Easy to add more types later.)
+enum TBucketType {
+ // Non-Bucketed
+ NONE = 0
+ // For hive compatibility, the hash function used in Hive's bucketed tables
+ HASH = 1
+}
+
struct TCompressionCodec {
// Compression codec
1: required THdfsCompression codec
@@ -185,6 +194,13 @@ struct TTableStats {
2: optional i64 total_file_bytes
}
+// Represents the bucket spec of a table.
+struct TBucketInfo {
+ 1: required TBucketType bucket_type
+ 2: optional list<string> bucket_columns
+ 3: required i32 num_bucket
+}
+
// Column stats data that Impala uses.
struct TColumnStats {
// Average size and max size, in bytes. Excludes serialization overhead.
@@ -479,6 +495,9 @@ struct THdfsTable {
// Set iff this is an acid table. The valid write ids list.
13: optional TValidWriteIdList valid_write_ids
+
+ // Bucket information for HDFS tables
+ 16: optional TBucketInfo bucket_info
}
struct THBaseTable {
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index dd87ba72b..79eb2989a 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -591,6 +591,9 @@ struct TCreateTableParams {
// Just one PartitionSpec when create iceberg table
21: optional CatalogObjects.TIcebergPartitionSpec partition_spec
+
+ // Bucket desc for created bucketed table
+ 22: optional CatalogObjects.TBucketInfo bucket_info
}
// Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
@@ -993,4 +996,4 @@ struct TEventProcessorMetricsSummaryResponse {
// summary view of the events processor which can include status,
// metrics and other details
1: required string summary
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 99fe7f007..cc6760897 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -50,6 +50,8 @@ import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDescribeOutputStyle;
import org.apache.impala.thrift.TFunctionCategory;
@@ -334,6 +336,7 @@ terminal BigDecimal DECIMAL_LITERAL;
terminal String STRING_LITERAL;
terminal String UNMATCHED_STRING_LITERAL;
terminal String UNEXPECTED_CHAR;
+terminal KW_CLUSTERED, KW_BUCKETS;
// IMPALA-3726 introduced the DEFAULT keyword which could break existing applications
// that use the identifier "KEYWORD" as database, column or table names. To avoid that,
@@ -387,6 +390,7 @@ nonterminal List<Expr> expr_list;
nonterminal String alias_clause;
nonterminal List<String> ident_list, primary_keys;
nonterminal List<String> opt_ident_list;
+nonterminal Pair<TBucketInfo, Pair<List<String>, TSortingOrder>> opt_clustered;
nonterminal Pair<List<String>, TSortingOrder> opt_sort_cols;
nonterminal TableName table_name;
nonterminal ColumnName column_name;
@@ -1628,16 +1632,35 @@ enable_spec ::=
;
tbl_options ::=
- opt_sort_cols:sort_cols opt_comment_val:comment opt_row_format_val:row_format
- serde_properties:serde_props file_format_create_table_val:file_format
- location_val:location opt_cache_op_val:cache_op
+ opt_clustered:clustered
+ opt_comment_val:comment
+ opt_row_format_val:row_format
+ serde_properties:serde_props
+ file_format_create_table_val:file_format
+ location_val:location
+ opt_cache_op_val:cache_op
tbl_properties:tbl_props
{:
CreateTableStmt.unescapeProperties(serde_props);
CreateTableStmt.unescapeProperties(tbl_props);
- RESULT = new TableDef.Options(sort_cols, comment, row_format, serde_props,
- file_format, location, cache_op, tbl_props, parser.getQueryOptions());
+ RESULT = new TableDef.Options(clustered.first, clustered.second, comment, row_format,
+ serde_props, file_format, location, cache_op, tbl_props, parser.getQueryOptions());
+ :}
+ ;
+
+opt_clustered ::=
+ opt_sort_cols:sort_cols
+ {:
+ TBucketInfo bucket_info = new TBucketInfo(TBucketType.NONE, 0);
+ RESULT = new Pair<TBucketInfo, Pair<List<String>, TSortingOrder>>(bucket_info, sort_cols);
+ :}
+ | KW_CLUSTERED KW_BY LPAREN opt_ident_list:col_names RPAREN opt_sort_cols:sort_cols
+ KW_INTO INTEGER_LITERAL:numBuckets KW_BUCKETS
+ {:
+ TBucketInfo bucket_info = new TBucketInfo(TBucketType.HASH, numBuckets.intValue());
+ bucket_info.setBucket_columns(col_names);
+ RESULT = new Pair<TBucketInfo, Pair<List<String>, TSortingOrder>>(bucket_info, sort_cols);
:}
;
@@ -3284,6 +3307,8 @@ plan_hints ::=
plan_hint ::=
KW_STRAIGHT_JOIN
{: RESULT = new PlanHint("straight_join"); :}
+ | KW_CLUSTERED
+ {: RESULT = new PlanHint("clustered"); :}
| IDENT:name
{: RESULT = new PlanHint(name); :}
| IDENT:name LPAREN ident_list:args RPAREN
@@ -4087,6 +4112,8 @@ word ::=
{: RESULT = r.toString(); :}
| KW_BOOLEAN:r
{: RESULT = r.toString(); :}
+ | KW_BUCKETS:r
+ {: RESULT = r.toString(); :}
| KW_BY:r
{: RESULT = r.toString(); :}
| KW_CACHED:r
@@ -4105,6 +4132,8 @@ word ::=
{: RESULT = r.toString(); :}
| KW_CLOSE_FN:r
{: RESULT = r.toString(); :}
+ | KW_CLUSTERED:r
+ {: RESULT = r.toString(); :}
| KW_COLUMN:r
{: RESULT = r.toString(); :}
| KW_COLUMNS:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index f8a8e3d7d..d0efe18ed 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -57,7 +57,7 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
null, null, null, new Pair<>(getSortColumns(), getSortingOrder()),
getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), compression, null,
- getLocation(), null);
+ getLocation(), null, null);
s = s.replace("__LIKE_FILEFORMAT__", String.format("LIKE %s '%s'",
schemaFileFormat_, schemaLocation_.toString()));
return s;
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 242ba70a8..f2452aff8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -36,6 +36,7 @@ import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBucketInfo;
import org.apache.impala.thrift.TCompressionCodec;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.THdfsCompression;
@@ -142,6 +143,7 @@ public class CreateTableStmt extends StatementBase {
public Map<String, String> getGeneratedKuduProperties() {
return tableDef_.getGeneratedProperties();
}
+ public TBucketInfo geTBucketInfo() { return tableDef_.geTBucketInfo(); }
// Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user
// at the table level. Note that primary keys may also be declared in column
@@ -215,6 +217,7 @@ public class CreateTableStmt extends StatementBase {
if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift());
params.setFile_format(getFileFormat());
params.setIf_not_exists(getIfNotExists());
+ if (geTBucketInfo() != null) params.setBucket_info(geTBucketInfo());
params.setSort_columns(getSortColumns());
params.setSorting_order(getSortingOrder());
params.setTable_properties(Maps.newHashMap(getTblProperties()));
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index f2348976f..594b7e376 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -28,6 +28,7 @@ import org.apache.impala.thrift.TAccessEvent;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDropTableOrViewParams;
import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.MetaStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,7 +136,9 @@ public class DropTableOrViewStmt extends StatementBase {
throw new AnalysisException(String.format(
"DROP VIEW not allowed on a table: %s.%s", dbName_, getTbl()));
}
- if (dropTable_) {
+ // It currently supports create bucketed tables,
+ // but it should also support drop bucketed tables.
+ if (dropTable_ && !MetaStoreUtil.isBucketedTable(table.getMetaStoreTable())) {
// To drop a view needs not write capabilities, only checks for tables.
analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index dd1465f12..05da2420a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -42,6 +42,8 @@ import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TAccessEvent;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.THdfsFileFormat;
import org.apache.impala.thrift.TQueryOptions;
@@ -150,8 +152,11 @@ class TableDef {
// Sorting order for SORT BY queries.
final TSortingOrder sortingOrder;
- Options(Pair<List<String>, TSortingOrder> sortProperties, String comment,
- RowFormat rowFormat, Map<String, String> serdeProperties,
+ // Bucket desc for CLUSTERED BY
+ final TBucketInfo bucketInfo;
+
+ Options(TBucketInfo bucketInfo, Pair<List<String>, TSortingOrder> sortProperties,
+ String comment, RowFormat rowFormat, Map<String, String> serdeProperties,
THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
Map<String, String> tblProperties, TQueryOptions queryOptions) {
this.sortCols = sortProperties.first;
@@ -168,12 +173,13 @@ class TableDef {
this.cachingOp = cachingOp;
Preconditions.checkNotNull(tblProperties);
this.tblProperties = tblProperties;
+ this.bucketInfo = bucketInfo;
}
public Options(String comment, TQueryOptions queryOptions) {
// Passing null to file format so that it uses the file format from the query option
// if specified, otherwise it will use the default file format, which is TEXT.
- this(new Pair<>(ImmutableList.of(), TSortingOrder.LEXICAL), comment,
+ this(null, new Pair<>(ImmutableList.of(), TSortingOrder.LEXICAL), comment,
RowFormat.DEFAULT_ROW_FORMAT, new HashMap<>(), /* file format */null, null,
null, new HashMap<>(), queryOptions);
}
@@ -393,6 +399,13 @@ class TableDef {
RowFormat getRowFormat() { return options_.rowFormat; }
TSortingOrder getSortingOrder() { return options_.sortingOrder; }
List<ForeignKey> getForeignKeysList() { return foreignKeysList_; }
+ TBucketInfo geTBucketInfo() { return options_.bucketInfo; }
+
+ boolean isBucketableFormat() {
+ return options_.fileFormat != THdfsFileFormat.KUDU
+ && options_.fileFormat != THdfsFileFormat.ICEBERG
+ && options_.fileFormat != THdfsFileFormat.HUDI_PARQUET;
+ }
/**
* Analyzes the parameters of a CREATE TABLE statement.
@@ -739,6 +752,10 @@ class TableDef {
options_.sortingOrder.toString()));
}
+ // analyze bucket columns
+ analyzeBucketColumns(options_.bucketInfo, getColumnNames(),
+ getPartitionColumnNames());
+
// Analyze sort columns.
if (options_.sortCols == null) return;
if (isKuduTable()) {
@@ -750,6 +767,55 @@ class TableDef {
getColumnTypes(), options_.sortingOrder);
}
+ private void analyzeBucketColumns(TBucketInfo bucketInfo, List<String> tableCols,
+ List<String> partitionCols) throws AnalysisException {
+ if (bucketInfo == null || bucketInfo.getBucket_type() == TBucketType.NONE) {
+ return;
+ }
+ // Bucketed Table only support hdfs fileformat
+ if (!isBucketableFormat()) {
+ throw new AnalysisException(String.format("CLUSTERED BY not support fileformat: " +
+ "'%s'", options_.fileFormat));
+ }
+ if (bucketInfo.getNum_bucket() <= 0) {
+ throw new AnalysisException(String.format(
+ "Bucket's number must be greater than 0."));
+ }
+ if (bucketInfo.getBucket_columns() == null
+ || bucketInfo.getBucket_columns().size() == 0) {
+ throw new AnalysisException(String.format(
+ "Bucket columns must be not null."));
+ }
+
+ // The index of each bucket column in the list of table columns.
+ Set<Integer> colIdxs = new LinkedHashSet<>();
+ for (String bucketCol : bucketInfo.getBucket_columns()) {
+ // Make sure it's not a partition column.
+ if (partitionCols.contains(bucketCol)) {
+ throw new AnalysisException(String.format("CLUSTERED BY column list must not " +
+ "contain partition column: '%s'", bucketCol));
+ }
+
+ // Determine the index of each bucket column in the list of table columns.
+ boolean foundColumn = false;
+ for (int j = 0; j < tableCols.size(); ++j) {
+ if (tableCols.get(j).equalsIgnoreCase(bucketCol)) {
+ if (colIdxs.contains(j)) {
+ throw new AnalysisException(String.format("Duplicate column in CLUSTERED " +
+ "BY list: %s", bucketCol));
+ }
+ colIdxs.add(j);
+ foundColumn = true;
+ break;
+ }
+ }
+ if (!foundColumn) {
+ throw new AnalysisException(String.format("Could not find CLUSTERED BY column " +
+ "'%s' in table.", bucketCol));
+ }
+ }
+ }
+
private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
if (options_.rowFormat == null) return;
if (isKuduTable()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index e261c310b..8136eb06c 100755
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -50,9 +50,12 @@ import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.catalog.Table;
import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
import org.apache.impala.thrift.TIcebergCatalog;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.BucketUtils;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.KuduUtil;
import org.slf4j.Logger;
@@ -291,7 +294,8 @@ public class ToSqlUtils {
kuduParamsSql, new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()),
properties, stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()),
- HdfsCompression.NONE, null, stmt.getLocation(), icebergPartitionSpecs);
+ HdfsCompression.NONE, null, stmt.getLocation(),
+ icebergPartitionSpecs, stmt.geTBucketInfo());
}
/**
@@ -328,7 +332,7 @@ public class ToSqlUtils {
innerStmt.getSortingOrder()), properties, innerStmt.getSerdeProperties(),
innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
- innerStmt.getLocation(), icebergPartitionSpecs);
+ innerStmt.getLocation(), icebergPartitionSpecs, innerStmt.geTBucketInfo());
return createTableSql + " AS " + stmt.getQueryStmt().toSql(options);
}
@@ -372,6 +376,7 @@ public class ToSqlUtils {
HdfsCompression compression = null;
String location = isHbaseTable ? null : msTable.getSd().getLocation();
Map<String, String> serdeParameters = msTable.getSd().getSerdeInfo().getParameters();
+ TBucketInfo bucketInfo = BucketUtils.fromStorageDescriptor(msTable.getSd());
String storageHandlerClassName = table.getStorageHandlerClassName();
List<String> primaryKeySql = new ArrayList<>();
@@ -447,7 +452,7 @@ public class ToSqlUtils {
partitionColsSql, primaryKeySql, foreignKeySql, kuduPartitionByParams,
new Pair<>(sortColsSql, sortingOrder), properties, serdeParameters,
isExternal, false, rowFormat, format, compression,
- storageHandlerClassName, tableLocation, icebergPartitions);
+ storageHandlerClassName, tableLocation, icebergPartitions, bucketInfo);
}
/**
@@ -462,7 +467,8 @@ public class ToSqlUtils {
Map<String, String> tblProperties, Map<String, String> serdeParameters,
boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
HdfsFileFormat fileFormat, HdfsCompression compression,
- String storageHandlerClass, HdfsUri location, String icebergPartitions) {
+ String storageHandlerClass, HdfsUri location, String icebergPartitions,
+ TBucketInfo bucketInfo) {
Preconditions.checkNotNull(tableName);
StringBuilder sb = new StringBuilder("CREATE ");
if (isExternal) sb.append("EXTERNAL ");
@@ -499,7 +505,16 @@ public class ToSqlUtils {
if (kuduPartitionByParams != null && !kuduPartitionByParams.equals("")) {
sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
}
- if (sortProperties.first != null) {
+ if (bucketInfo != null && bucketInfo.getBucket_type() != TBucketType.NONE) {
+ sb.append(String.format("CLUSTERED BY (\n %s\n)\n",
+ Joiner.on(", \n ").join(bucketInfo.getBucket_columns())));
+ if (sortProperties.first != null) {
+ sb.append(String.format("SORT BY %s (\n %s\n)\n",
+ sortProperties.second.toString(),
+ Joiner.on(", \n ").join(sortProperties.first)));
+ }
+ sb.append(String.format("INTO %s BUCKETS\n", bucketInfo.getNum_bucket()));
+ } else if (sortProperties.first != null) {
sb.append(String.format("SORT BY %s (\n %s\n)\n", sortProperties.second.toString(),
Joiner.on(", \n ").join(sortProperties.first)));
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 23712a844..ed896c42a 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -174,6 +174,8 @@ import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.thrift.TAlterTableUnSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
@@ -3330,6 +3332,11 @@ public class CatalogOpExecutor {
tbl.setTableType(TableType.MANAGED_TABLE.toString());
}
+ // Set bucketing_version to table parameter
+ if (params.getBucket_info() != null
+ && params.getBucket_info().getBucket_type() != TBucketType.NONE) {
+ tbl.getParameters().put("bucketing_version", "2");
+ }
tbl.setSd(createSd(params));
if (params.getPartition_columns() != null) {
// Add in any partition keys that were specified
@@ -3355,6 +3362,13 @@ public class CatalogOpExecutor {
if (params.getLocation() != null) sd.setLocation(params.getLocation());
+ // Add bucket desc
+ if (params.getBucket_info() != null
+ && params.getBucket_info().getBucket_type() != TBucketType.NONE) {
+ sd.setBucketCols(params.getBucket_info().getBucket_columns());
+ sd.setNumBuckets(params.getBucket_info().getNum_bucket());
+ }
+
// Add in all the columns
sd.setCols(buildFieldSchemaList(params.getColumns()));
return sd;
diff --git a/fe/src/main/java/org/apache/impala/util/BucketUtils.java b/fe/src/main/java/org/apache/impala/util/BucketUtils.java
new file mode 100644
index 000000000..67034d606
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/BucketUtils.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
+
+import com.google.common.base.Preconditions;
+
+public class BucketUtils {
+
+ /**
+ * Parse to TBucketInfo from StorageDescriptor of the HMS table
+ */
+ public static TBucketInfo fromStorageDescriptor(StorageDescriptor sd) {
+ Preconditions.checkNotNull(sd);
+ if (sd.getBucketCols() == null || sd.getBucketCols().size() == 0
+ || sd.getNumBuckets() == 0) {
+ return new TBucketInfo(TBucketType.NONE, 0);
+ }
+ TBucketInfo bucketInfo = new TBucketInfo(TBucketType.HASH, sd.getNumBuckets());
+ bucketInfo.setBucket_columns(sd.getBucketCols());
+ return bucketInfo;
+ }
+}
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 48eb00d83..4acb7a136 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -82,6 +82,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
keywordMap.put("binary", SqlParserSymbols.KW_BINARY);
keywordMap.put("block_size", SqlParserSymbols.KW_BLOCKSIZE);
keywordMap.put("boolean", SqlParserSymbols.KW_BOOLEAN);
+ keywordMap.put("buckets", SqlParserSymbols.KW_BUCKETS);
keywordMap.put("by", SqlParserSymbols.KW_BY);
keywordMap.put("cached", SqlParserSymbols.KW_CACHED);
keywordMap.put("cascade", SqlParserSymbols.KW_CASCADE);
@@ -90,6 +91,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
keywordMap.put("change", SqlParserSymbols.KW_CHANGE);
keywordMap.put("char", SqlParserSymbols.KW_CHAR);
keywordMap.put("class", SqlParserSymbols.KW_CLASS);
+ keywordMap.put("clustered", SqlParserSymbols.KW_CLUSTERED);
keywordMap.put("close_fn", SqlParserSymbols.KW_CLOSE_FN);
keywordMap.put("column", SqlParserSymbols.KW_COLUMN);
keywordMap.put("columns", SqlParserSymbols.KW_COLUMNS);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 49416a25e..30d02eff6 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1346,6 +1346,25 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"ALTER TABLE SORT BY not supported on HBase tables.");
}
+ @Test
+ public void TestAlterBucketedTable() throws AnalysisException {
+ AnalyzesOk("alter table functional.bucketed_table rename to bucketed_table_test");
+ AnalyzesOk("drop table functional.bucketed_table");
+
+ AnalysisError("alter table functional.bucketed_table add columns (a int)",
+ "functional.bucketed_table is a bucketed table. " +
+ "Only read operations are supported on such tables.");
+ AnalysisError("alter table functional.bucketed_table change col1 default bigint",
+ "functional.bucketed_table is a bucketed table. " +
+ "Only read operations are supported on such tables.");
+ AnalysisError("alter table functional.bucketed_table replace columns (a int)",
+ "functional.bucketed_table is a bucketed table. " +
+ "Only read operations are supported on such tables.");
+ AnalysisError("alter table functional.bucketed_table drop col1",
+ "functional.bucketed_table is a bucketed table. " +
+ "Only read operations are supported on such tables.");
+ }
+
@Test
public void TestAlterView() {
// View-definition references a table.
@@ -2823,6 +2842,38 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"column: 'd'");
}
+ @Test
+ public void TestCreateBucketedTable() throws AnalysisException {
+ AnalyzesOk("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY(i) INTO 24 BUCKETS");
+ AnalyzesOk("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY(i) SORT BY (s) INTO 24 BUCKETS");
+
+ // Bucketed table not supported for Kudu and ICEBERG table
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO 24 BUCKETS STORED BY KUDU", "CLUSTERED BY not " +
+ "support fileformat: 'KUDU'");
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO 24 BUCKETS STORED BY ICEBERG",
+ "CLUSTERED BY not support fileformat: 'ICEBERG'");
+ // Bucketed columns must not contain partition column and don't duplicate
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "PARTITIONED BY(dt string) CLUSTERED BY (dt) INTO 24 BUCKETS",
+ "CLUSTERED BY column list must not contain partition column: 'dt'");
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i, i) INTO 24 BUCKETS",
+ "Duplicate column in CLUSTERED BY list: i");
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (a) INTO 24 BUCKETS",
+ "Could not find CLUSTERED BY column 'a' in table.");
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO 0 BUCKETS",
+ "Bucket's number must be greater than 0.");
+ AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY () INTO 12 BUCKETS",
+ "Bucket columns must be not null.");
+ }
+
@Test
public void TestCreateAvroTest() {
String alltypesSchemaLoc =
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 0fcae9d30..0882bfb64 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -932,6 +932,7 @@ public class AnalyzerTest extends FrontendTestBase {
AnalyzesOk("show column stats functional.bucketed_table");
AnalyzesOk("create table test as select * from functional.bucketed_table");
AnalyzesOk("compute stats functional.bucketed_table");
+ AnalyzesOk("drop table functional.bucketed_table");
String errorMsgBucketed = "functional.bucketed_table " +
"is a bucketed table. Only read operations are supported on such tables.";
@@ -959,7 +960,6 @@ public class AnalyzerTest extends FrontendTestBase {
AnalysisError("insert into functional.bucketed_table select * from " +
"functional.bucketed_table", errorMsgBucketed);
AnalysisError("create table test like functional.bucketed_table", errorMsgBucketed);
- AnalysisError("drop table functional.bucketed_table", errorMsgBucketed);
AnalysisError("truncate table functional.bucketed_table", errorMsgBucketed);
AnalysisError("alter table functional.bucketed_table add columns(col3 int)",
errorMsgBucketed);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 524041c4f..7f9e70eaf 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3060,6 +3060,32 @@ public class ParserTest extends FrontendTestBase {
ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " +
"STORED AS KUDU");
ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU");
+
+ // Supported bucketed table
+ ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO 24 BUCKETS");
+ ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', a int, s string) " +
+ "CLUSTERED BY (i, a) INTO 24 BUCKETS");
+
+ ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "PARTITIONED BY(dt string) CLUSTERED BY (i) INTO 24 BUCKETS");
+ ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) SORT BY(s) INTO 24 BUCKETS");
+ ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "PARTITIONED BY(dt string) CLUSTERED BY (i) SORT BY (s) " +
+ "INTO 24 BUCKETS");
+
+ ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i)");
+ ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED INTO 24 BUCKETS ");
+ ParserError("CREATE TABLE (i int, s string) CLUSTERED INTO 24 BUCKETS");
+ ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO BUCKETS");
+ ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "PARTITIONED BY(dt string) CLUSTERED BY (i) INTO BUCKETS");
+ ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+ "CLUSTERED BY (i) INTO 12 BUCKETS SORT BY (s)");
}
@Test
@@ -4362,10 +4388,4 @@ public class ParserTest extends FrontendTestBase {
ParsesOk("--test\nSELECT 1\n");
ParsesOk("--test\nSELECT 1\n ");
}
-
- @Test
- public void TestCreateBucketedTable() {
- ParserError("Create table bucketed_tbl(order_id int, order_name string)"
- + "clustered by (order_id) into 5 buckets", "Syntax error");
- }
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 865a72d20..0c053ce68 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -471,6 +471,24 @@ public class ToSqlTest extends FrontendTestBase {
"SORT BY ZORDER ( int_col, id ) STORED AS TEXTFILE", true);
}
+ @Test
+ public void TestCreateBucketTable() throws AnalysisException {
+ testToSql("create table bucketed_table ( a int ) " +
+ "CLUSTERED BY ( a ) into 24 buckets", "default",
+ "CREATE TABLE default.bucketed_table ( a INT ) " +
+ "CLUSTERED BY ( a ) INTO 24 BUCKETS STORED AS TEXTFILE", true);
+ testToSql("create table bucketed_table1 ( a int ) " +
+ "partitioned by ( dt string ) CLUSTERED BY ( a ) into 24 buckets", "default",
+ "CREATE TABLE default.bucketed_table1 ( a INT ) " +
+ "PARTITIONED BY ( dt STRING ) CLUSTERED BY ( a ) INTO 24 BUCKETS " +
+ "STORED AS TEXTFILE", true);
+ testToSql("create table bucketed_table2 ( a int, s string ) partitioned " +
+ "by ( dt string ) CLUSTERED BY ( a ) sort by (s) into 24 buckets",
+ "default", "CREATE TABLE default.bucketed_table2 " +
+ "( a INT, s STRING ) PARTITIONED BY ( dt STRING ) CLUSTERED BY ( a ) SORT BY " +
+ "LEXICAL ( s ) INTO 24 BUCKETS STORED AS TEXTFILE", true);
+ }
+
@Test
public void TestCreateView() throws AnalysisException {
testToSql(
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table.test b/testdata/workloads/functional-query/queries/QueryTest/create-table.test
index 9b3ae0c68..e293849e3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table.test
@@ -309,3 +309,54 @@ describe formatted zsortbytest;
---- TYPES
STRING,STRING,STRING
====
+---- QUERY
+# Create bucketed table
+create table bucketed_test (a int, b string) CLUSTERED BY (a) into 24 buckets;
+describe formatted bucketed_test;
+---- RESULTS: VERIFY_IS_NOT_IN
+'Num Buckets: ','24 ','NULL'
+'Bucket Columns: ','[a] ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test t;
+---- RESULTS
+---- TYPES
+INT, STRING
+====
+---- QUERY
+# Create bucketed table
+create table bucketed_test2 (a int, b string) partitioned by(day string)
+CLUSTERED BY (a) into 24 buckets;
+describe formatted bucketed_test2;
+---- RESULTS: VERIFY_IS_NOT_IN
+'Num Buckets: ','24 ','NULL'
+'Bucket Columns: ','[a] ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test2 t where day = '2022-09-29';
+---- RESULTS
+---- TYPES
+INT, STRING
+====
+---- QUERY
+# Create bucketed table
+create table bucketed_test3 (a int, b string) CLUSTERED BY (a) sort by (b) into 24 buckets;
+describe formatted bucketed_test3;
+---- RESULTS: VERIFY_IS_SUBSET
+'','bucketing_version ','2 '
+'','sort.columns ','b '
+'Num Buckets: ','24 ','NULL'
+'Bucket Columns: ','[a] ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test3 t;
+---- RESULTS
+---- TYPES
+INT, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index c2485aafa..a5293bee8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -1012,3 +1012,24 @@ LOCATION '$$location_uri$$'
TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.merge.mode'='copy-on-write')
====
+---- CREATE_TABLE
+# Test create Bucketed Table
+CREATE TABLE bucketed_test (a int, b string) CLUSTERED BY (a) into 4 buckets;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.bucketed_test (a INT, b STRING)
+CLUSTERED BY (a) INTO 4 BUCKETS
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('bucketing_version'='2', 'external.table.purge'='TRUE')
+====
+---- CREATE_TABLE
+# Test create Bucketed Table
+CREATE TABLE bucketed_sort_test (a int, b string) CLUSTERED BY (a) sort by (b) into 4 buckets;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.bucketed_sort_test (a INT, b STRING)
+CLUSTERED BY (a) SORT BY LEXICAL (b) INTO 4 BUCKETS
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('bucketing_version'='2', 'sort.columns'='b', 'sort.order'='LEXICAL',
+'external.table.purge'='TRUE')
+====
\ No newline at end of file
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index f7fabf155..f1d44627e 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -45,7 +45,7 @@ class TestShowCreateTable(ImpalaTestSuite):
"impala.events.catalogVersion", "uuid",
"current-schema", "snapshot-count", "default-partition-spec",
"current-snapshot-id", "current-snapshot-summary",
- "current-snapshot-timestamp-ms"]
+ "current-snapshot-timestamp-ms", "sort.columns", "sort.order"]
@classmethod
def get_workload(self):