You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/25 09:07:19 UTC

[impala] branch master updated: IMPALA-3119: DDL support for bucketed tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2733d039a IMPALA-3119: DDL support for bucketed tables
2733d039a is described below

commit 2733d039ad4a830a1ea34c1a75d2b666788e39a9
Author: xiabaike <xi...@163.com>
AuthorDate: Sun Sep 25 03:02:27 2022 +0000

    IMPALA-3119: DDL support for bucketed tables
    
    Add syntactic support for creating bucketed table.
    The specific syntax in the create table statement is as follows:
     [CLUSTERED BY (column[, column ...]) [SORT BY (column[, column ...])]
         INTO 24 BUCKETS]
    
    Example:
    CREATE TABLE tbl (i int COMMENT 'hello', s string)
    CLUSTERED BY (i) INTO 24 BUCKETS;
    CREATE TABLE tbl (i int COMMENT 'hello', s string)
    CLUSTERED BY (i) SORT BY (s) INTO 24 BUCKETS;
    
    Instructions:
    1. The bucket partitioning algorithm is the hash function used
      in Hive's bucketed tables;
    2. Create Bucketed Table statements currently don't support Kudu and
      Iceberg tables;
    3. In the current version, alter operations(add/drop/change/replace
     columns) on bucketed tables are not supported;
    4. Support dropping bucketed table;
    
    This COMMIT is the first subtask of IMPALA-3118.
    
    Change-Id: I919b4d4139bc3a7784fa6fdb6f064e25666d548e
    Reviewed-on: http://gerrit.cloudera.org:8080/19055
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                | 19 ++++++
 common/thrift/JniCatalog.thrift                    |  5 +-
 fe/src/main/cup/sql-parser.cup                     | 39 ++++++++++--
 .../impala/analysis/CreateTableLikeFileStmt.java   |  2 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  3 +
 .../impala/analysis/DropTableOrViewStmt.java       |  5 +-
 .../java/org/apache/impala/analysis/TableDef.java  | 72 +++++++++++++++++++++-
 .../org/apache/impala/analysis/ToSqlUtils.java     | 25 ++++++--
 .../apache/impala/service/CatalogOpExecutor.java   | 14 +++++
 .../java/org/apache/impala/util/BucketUtils.java   | 41 ++++++++++++
 fe/src/main/jflex/sql-scanner.flex                 |  2 +
 .../org/apache/impala/analysis/AnalyzeDDLTest.java | 51 +++++++++++++++
 .../org/apache/impala/analysis/AnalyzerTest.java   |  2 +-
 .../org/apache/impala/analysis/ParserTest.java     | 32 ++++++++--
 .../java/org/apache/impala/analysis/ToSqlTest.java | 18 ++++++
 .../queries/QueryTest/create-table.test            | 51 +++++++++++++++
 .../queries/QueryTest/show-create-table.test       | 21 +++++++
 tests/metadata/test_show_create_table.py           |  2 +-
 18 files changed, 380 insertions(+), 24 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 846baa0dc..b5a7a0b4f 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -152,6 +152,15 @@ enum TIcebergPartitionTransformType {
   VOID = 7
 }
 
+// Data distribution method of bucketed table.
+// (Easy to add more types later.)
+enum TBucketType {
+  // Non-Bucketed
+  NONE = 0
+  // For hive compatibility, the hash function used in Hive's bucketed tables
+  HASH = 1
+}
+
 struct TCompressionCodec {
   // Compression codec
   1: required THdfsCompression codec
@@ -185,6 +194,13 @@ struct TTableStats {
   2: optional i64 total_file_bytes
 }
 
+// Represents the bucket spec of a table.
+struct TBucketInfo {
+  1: required TBucketType bucket_type
+  2: optional list<string> bucket_columns
+  3: required i32 num_bucket
+}
+
 // Column stats data that Impala uses.
 struct TColumnStats {
   // Average size and max size, in bytes. Excludes serialization overhead.
@@ -479,6 +495,9 @@ struct THdfsTable {
 
   // Set iff this is an acid table. The valid write ids list.
   13: optional TValidWriteIdList valid_write_ids
+
+  // Bucket information for HDFS tables
+  16: optional TBucketInfo bucket_info
 }
 
 struct THBaseTable {
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index dd87ba72b..79eb2989a 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -591,6 +591,9 @@ struct TCreateTableParams {
 
   // Just one PartitionSpec when create iceberg table
   21: optional CatalogObjects.TIcebergPartitionSpec partition_spec
+
+  // Bucket desc for created bucketed table
+  22: optional CatalogObjects.TBucketInfo bucket_info
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
@@ -993,4 +996,4 @@ struct TEventProcessorMetricsSummaryResponse {
   // summary view of the events processor which can include status,
   // metrics and other details
   1: required string summary
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 99fe7f007..cc6760897 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -50,6 +50,8 @@ import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TFunctionCategory;
@@ -334,6 +336,7 @@ terminal BigDecimal DECIMAL_LITERAL;
 terminal String STRING_LITERAL;
 terminal String UNMATCHED_STRING_LITERAL;
 terminal String UNEXPECTED_CHAR;
+terminal KW_CLUSTERED, KW_BUCKETS;
 
 // IMPALA-3726 introduced the DEFAULT keyword which could break existing applications
 // that use the identifier "KEYWORD" as database, column or table names. To avoid that,
@@ -387,6 +390,7 @@ nonterminal List<Expr> expr_list;
 nonterminal String alias_clause;
 nonterminal List<String> ident_list, primary_keys;
 nonterminal List<String> opt_ident_list;
+nonterminal Pair<TBucketInfo, Pair<List<String>, TSortingOrder>> opt_clustered;
 nonterminal Pair<List<String>, TSortingOrder> opt_sort_cols;
 nonterminal TableName table_name;
 nonterminal ColumnName column_name;
@@ -1628,16 +1632,35 @@ enable_spec ::=
   ;
 
 tbl_options ::=
-  opt_sort_cols:sort_cols opt_comment_val:comment opt_row_format_val:row_format
-  serde_properties:serde_props file_format_create_table_val:file_format
-  location_val:location opt_cache_op_val:cache_op
+  opt_clustered:clustered
+  opt_comment_val:comment
+  opt_row_format_val:row_format
+  serde_properties:serde_props
+  file_format_create_table_val:file_format
+  location_val:location
+  opt_cache_op_val:cache_op
   tbl_properties:tbl_props
   {:
     CreateTableStmt.unescapeProperties(serde_props);
     CreateTableStmt.unescapeProperties(tbl_props);
 
-    RESULT = new TableDef.Options(sort_cols, comment, row_format, serde_props,
-        file_format, location, cache_op, tbl_props, parser.getQueryOptions());
+    RESULT = new TableDef.Options(clustered.first, clustered.second, comment, row_format,
+        serde_props, file_format, location, cache_op, tbl_props, parser.getQueryOptions());
+  :}
+  ;
+
+opt_clustered ::=
+  opt_sort_cols:sort_cols
+  {:
+    TBucketInfo bucket_info = new TBucketInfo(TBucketType.NONE, 0);
+    RESULT = new Pair<TBucketInfo, Pair<List<String>, TSortingOrder>>(bucket_info, sort_cols);
+  :}
+  | KW_CLUSTERED KW_BY LPAREN opt_ident_list:col_names RPAREN opt_sort_cols:sort_cols
+        KW_INTO INTEGER_LITERAL:numBuckets KW_BUCKETS
+  {:
+    TBucketInfo bucket_info = new TBucketInfo(TBucketType.HASH, numBuckets.intValue());
+    bucket_info.setBucket_columns(col_names);
+    RESULT = new Pair<TBucketInfo, Pair<List<String>, TSortingOrder>>(bucket_info, sort_cols);
   :}
   ;
 
@@ -3284,6 +3307,8 @@ plan_hints ::=
 plan_hint ::=
   KW_STRAIGHT_JOIN
   {: RESULT = new PlanHint("straight_join"); :}
+  | KW_CLUSTERED
+  {: RESULT = new PlanHint("clustered"); :}
   | IDENT:name
   {: RESULT = new PlanHint(name); :}
   | IDENT:name LPAREN ident_list:args RPAREN
@@ -4087,6 +4112,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_BOOLEAN:r
   {: RESULT = r.toString(); :}
+  | KW_BUCKETS:r
+  {: RESULT = r.toString(); :}
   | KW_BY:r
   {: RESULT = r.toString(); :}
   | KW_CACHED:r
@@ -4105,6 +4132,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_CLOSE_FN:r
   {: RESULT = r.toString(); :}
+  | KW_CLUSTERED:r
+  {: RESULT = r.toString(); :}
   | KW_COLUMN:r
   {: RESULT = r.toString(); :}
   | KW_COLUMNS:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index f8a8e3d7d..d0efe18ed 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -57,7 +57,7 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
         null, null, null, new Pair<>(getSortColumns(), getSortingOrder()),
         getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
         getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), compression, null,
-        getLocation(), null);
+        getLocation(), null, null);
     s = s.replace("__LIKE_FILEFORMAT__", String.format("LIKE %s '%s'",
         schemaFileFormat_, schemaLocation_.toString()));
     return s;
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 242ba70a8..f2452aff8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -36,6 +36,7 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBucketInfo;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsCompression;
@@ -142,6 +143,7 @@ public class CreateTableStmt extends StatementBase {
   public Map<String, String> getGeneratedKuduProperties() {
     return tableDef_.getGeneratedProperties();
   }
+  public TBucketInfo geTBucketInfo() { return tableDef_.geTBucketInfo(); }
 
   // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user
   // at the table level. Note that primary keys may also be declared in column
@@ -215,6 +217,7 @@ public class CreateTableStmt extends StatementBase {
     if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift());
     params.setFile_format(getFileFormat());
     params.setIf_not_exists(getIfNotExists());
+    if (geTBucketInfo() != null) params.setBucket_info(geTBucketInfo());
     params.setSort_columns(getSortColumns());
     params.setSorting_order(getSortingOrder());
     params.setTable_properties(Maps.newHashMap(getTblProperties()));
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index f2348976f..594b7e376 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -28,6 +28,7 @@ import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDropTableOrViewParams;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.MetaStoreUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -135,7 +136,9 @@ public class DropTableOrViewStmt extends StatementBase {
         throw new AnalysisException(String.format(
             "DROP VIEW not allowed on a table: %s.%s", dbName_, getTbl()));
       }
-      if (dropTable_) {
+      // It currently supports create bucketed tables,
+      // but it should also support drop bucketed tables.
+      if (dropTable_ && !MetaStoreUtil.isBucketedTable(table.getMetaStoreTable())) {
         // To drop a view needs not write capabilities, only checks for tables.
         analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index dd1465f12..05da2420a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -42,6 +42,8 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAccessEvent;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TQueryOptions;
@@ -150,8 +152,11 @@ class TableDef {
     // Sorting order for SORT BY queries.
     final TSortingOrder sortingOrder;
 
-    Options(Pair<List<String>, TSortingOrder> sortProperties, String comment,
-        RowFormat rowFormat, Map<String, String> serdeProperties,
+    // Bucket desc for CLUSTERED BY
+    final TBucketInfo bucketInfo;
+
+    Options(TBucketInfo bucketInfo, Pair<List<String>, TSortingOrder> sortProperties,
+        String comment, RowFormat rowFormat, Map<String, String> serdeProperties,
         THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
         Map<String, String> tblProperties, TQueryOptions queryOptions) {
       this.sortCols = sortProperties.first;
@@ -168,12 +173,13 @@ class TableDef {
       this.cachingOp = cachingOp;
       Preconditions.checkNotNull(tblProperties);
       this.tblProperties = tblProperties;
+      this.bucketInfo = bucketInfo;
     }
 
     public Options(String comment, TQueryOptions queryOptions) {
       // Passing null to file format so that it uses the file format from the query option
       // if specified, otherwise it will use the default file format, which is TEXT.
-      this(new Pair<>(ImmutableList.of(), TSortingOrder.LEXICAL), comment,
+      this(null, new Pair<>(ImmutableList.of(), TSortingOrder.LEXICAL), comment,
           RowFormat.DEFAULT_ROW_FORMAT, new HashMap<>(), /* file format */null, null,
           null, new HashMap<>(), queryOptions);
     }
@@ -393,6 +399,13 @@ class TableDef {
   RowFormat getRowFormat() { return options_.rowFormat; }
   TSortingOrder getSortingOrder() { return options_.sortingOrder; }
   List<ForeignKey> getForeignKeysList() { return foreignKeysList_; }
+  TBucketInfo geTBucketInfo() { return options_.bucketInfo; }
+
+  boolean isBucketableFormat() {
+    return options_.fileFormat != THdfsFileFormat.KUDU
+        && options_.fileFormat != THdfsFileFormat.ICEBERG
+        && options_.fileFormat != THdfsFileFormat.HUDI_PARQUET;
+  }
 
   /**
    * Analyzes the parameters of a CREATE TABLE statement.
@@ -739,6 +752,10 @@ class TableDef {
           options_.sortingOrder.toString()));
     }
 
+    // analyze bucket columns
+    analyzeBucketColumns(options_.bucketInfo, getColumnNames(),
+        getPartitionColumnNames());
+
     // Analyze sort columns.
     if (options_.sortCols == null) return;
     if (isKuduTable()) {
@@ -750,6 +767,55 @@ class TableDef {
         getColumnTypes(), options_.sortingOrder);
   }
 
+  private void analyzeBucketColumns(TBucketInfo bucketInfo, List<String> tableCols,
+      List<String> partitionCols) throws AnalysisException {
+    if (bucketInfo == null || bucketInfo.getBucket_type() == TBucketType.NONE) {
+      return;
+    }
+    // Bucketed Table only support hdfs fileformat
+    if (!isBucketableFormat()) {
+      throw new AnalysisException(String.format("CLUSTERED BY not support fileformat: " +
+          "'%s'", options_.fileFormat));
+    }
+    if (bucketInfo.getNum_bucket() <= 0) {
+      throw new AnalysisException(String.format(
+          "Bucket's number must be greater than 0."));
+    }
+    if (bucketInfo.getBucket_columns() == null
+        || bucketInfo.getBucket_columns().size() == 0) {
+      throw new AnalysisException(String.format(
+          "Bucket columns must be not null."));
+    }
+
+    // The index of each bucket column in the list of table columns.
+    Set<Integer> colIdxs = new LinkedHashSet<>();
+    for (String bucketCol : bucketInfo.getBucket_columns()) {
+      // Make sure it's not a partition column.
+      if (partitionCols.contains(bucketCol)) {
+        throw new AnalysisException(String.format("CLUSTERED BY column list must not " +
+            "contain partition column: '%s'", bucketCol));
+      }
+
+      // Determine the index of each bucket column in the list of table columns.
+      boolean foundColumn = false;
+      for (int j = 0; j < tableCols.size(); ++j) {
+        if (tableCols.get(j).equalsIgnoreCase(bucketCol)) {
+          if (colIdxs.contains(j)) {
+            throw new AnalysisException(String.format("Duplicate column in CLUSTERED " +
+                "BY list: %s", bucketCol));
+          }
+          colIdxs.add(j);
+          foundColumn = true;
+          break;
+        }
+      }
+      if (!foundColumn) {
+        throw new AnalysisException(String.format("Could not find CLUSTERED BY column " +
+            "'%s' in table.", bucketCol));
+      }
+    }
+  }
+
   private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
     if (options_.rowFormat == null) return;
     if (isKuduTable()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index e261c310b..8136eb06c 100755
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -50,9 +50,12 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.BucketUtils;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
@@ -291,7 +294,8 @@ public class ToSqlUtils {
         kuduParamsSql, new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()),
         properties, stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
         stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()),
-        HdfsCompression.NONE, null, stmt.getLocation(), icebergPartitionSpecs);
+        HdfsCompression.NONE, null, stmt.getLocation(),
+        icebergPartitionSpecs, stmt.geTBucketInfo());
   }
 
   /**
@@ -328,7 +332,7 @@ public class ToSqlUtils {
         innerStmt.getSortingOrder()), properties, innerStmt.getSerdeProperties(),
         innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
         HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
-        innerStmt.getLocation(), icebergPartitionSpecs);
+        innerStmt.getLocation(), icebergPartitionSpecs, innerStmt.geTBucketInfo());
     return createTableSql + " AS " + stmt.getQueryStmt().toSql(options);
   }
 
@@ -372,6 +376,7 @@ public class ToSqlUtils {
     HdfsCompression compression = null;
     String location = isHbaseTable ? null : msTable.getSd().getLocation();
     Map<String, String> serdeParameters = msTable.getSd().getSerdeInfo().getParameters();
+    TBucketInfo bucketInfo = BucketUtils.fromStorageDescriptor(msTable.getSd());
 
     String storageHandlerClassName = table.getStorageHandlerClassName();
     List<String> primaryKeySql = new ArrayList<>();
@@ -447,7 +452,7 @@ public class ToSqlUtils {
         partitionColsSql, primaryKeySql, foreignKeySql, kuduPartitionByParams,
         new Pair<>(sortColsSql, sortingOrder), properties, serdeParameters,
         isExternal, false, rowFormat, format, compression,
-        storageHandlerClassName, tableLocation, icebergPartitions);
+        storageHandlerClassName, tableLocation, icebergPartitions, bucketInfo);
   }
 
   /**
@@ -462,7 +467,8 @@ public class ToSqlUtils {
       Map<String, String> tblProperties, Map<String, String> serdeParameters,
       boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
       HdfsFileFormat fileFormat, HdfsCompression compression,
-      String storageHandlerClass, HdfsUri location, String icebergPartitions) {
+      String storageHandlerClass, HdfsUri location, String icebergPartitions,
+      TBucketInfo bucketInfo) {
     Preconditions.checkNotNull(tableName);
     StringBuilder sb = new StringBuilder("CREATE ");
     if (isExternal) sb.append("EXTERNAL ");
@@ -499,7 +505,16 @@ public class ToSqlUtils {
     if (kuduPartitionByParams != null && !kuduPartitionByParams.equals("")) {
       sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
     }
-    if (sortProperties.first != null) {
+    if (bucketInfo != null && bucketInfo.getBucket_type() != TBucketType.NONE) {
+      sb.append(String.format("CLUSTERED BY (\n %s\n)\n",
+          Joiner.on(", \n  ").join(bucketInfo.getBucket_columns())));
+      if (sortProperties.first != null) {
+        sb.append(String.format("SORT BY %s (\n  %s\n)\n",
+            sortProperties.second.toString(),
+            Joiner.on(", \n  ").join(sortProperties.first)));
+      }
+      sb.append(String.format("INTO %s BUCKETS\n", bucketInfo.getNum_bucket()));
+    } else if (sortProperties.first != null) {
       sb.append(String.format("SORT BY %s (\n  %s\n)\n", sortProperties.second.toString(),
           Joiner.on(", \n  ").join(sortProperties.first)));
     }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 23712a844..ed896c42a 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -174,6 +174,8 @@ import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TAlterTableUnSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
@@ -3330,6 +3332,11 @@ public class CatalogOpExecutor {
       tbl.setTableType(TableType.MANAGED_TABLE.toString());
     }
 
+    // Set bucketing_version to table parameter
+    if (params.getBucket_info() != null
+        && params.getBucket_info().getBucket_type() != TBucketType.NONE) {
+      tbl.getParameters().put("bucketing_version", "2");
+    }
     tbl.setSd(createSd(params));
     if (params.getPartition_columns() != null) {
       // Add in any partition keys that were specified
@@ -3355,6 +3362,13 @@ public class CatalogOpExecutor {
 
     if (params.getLocation() != null) sd.setLocation(params.getLocation());
 
+    // Add bucket desc
+    if (params.getBucket_info() != null
+        && params.getBucket_info().getBucket_type() != TBucketType.NONE) {
+      sd.setBucketCols(params.getBucket_info().getBucket_columns());
+      sd.setNumBuckets(params.getBucket_info().getNum_bucket());
+    }
+
     // Add in all the columns
     sd.setCols(buildFieldSchemaList(params.getColumns()));
     return sd;
diff --git a/fe/src/main/java/org/apache/impala/util/BucketUtils.java b/fe/src/main/java/org/apache/impala/util/BucketUtils.java
new file mode 100644
index 000000000..67034d606
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/BucketUtils.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.impala.thrift.TBucketInfo;
+import org.apache.impala.thrift.TBucketType;
+
+import com.google.common.base.Preconditions;
+
+public class BucketUtils {
+
+  /**
+   * Parse to TBucketInfo from StorageDescriptor of the HMS table
+   */
+  public static TBucketInfo fromStorageDescriptor(StorageDescriptor sd) {
+    Preconditions.checkNotNull(sd);
+    if (sd.getBucketCols() == null || sd.getBucketCols().size() == 0
+        || sd.getNumBuckets() == 0) {
+      return new TBucketInfo(TBucketType.NONE, 0);
+    }
+    TBucketInfo bucketInfo = new TBucketInfo(TBucketType.HASH, sd.getNumBuckets());
+    bucketInfo.setBucket_columns(sd.getBucketCols());
+    return bucketInfo;
+  }
+}
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 48eb00d83..4acb7a136 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -82,6 +82,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("binary", SqlParserSymbols.KW_BINARY);
     keywordMap.put("block_size", SqlParserSymbols.KW_BLOCKSIZE);
     keywordMap.put("boolean", SqlParserSymbols.KW_BOOLEAN);
+    keywordMap.put("buckets", SqlParserSymbols.KW_BUCKETS);
     keywordMap.put("by", SqlParserSymbols.KW_BY);
     keywordMap.put("cached", SqlParserSymbols.KW_CACHED);
     keywordMap.put("cascade", SqlParserSymbols.KW_CASCADE);
@@ -90,6 +91,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("change", SqlParserSymbols.KW_CHANGE);
     keywordMap.put("char", SqlParserSymbols.KW_CHAR);
     keywordMap.put("class", SqlParserSymbols.KW_CLASS);
+    keywordMap.put("clustered", SqlParserSymbols.KW_CLUSTERED);
     keywordMap.put("close_fn", SqlParserSymbols.KW_CLOSE_FN);
     keywordMap.put("column", SqlParserSymbols.KW_COLUMN);
     keywordMap.put("columns", SqlParserSymbols.KW_COLUMNS);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 49416a25e..30d02eff6 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1346,6 +1346,25 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "ALTER TABLE SORT BY not supported on HBase tables.");
   }
 
+  @Test
+  public void TestAlterBucketedTable() throws AnalysisException {
+    AnalyzesOk("alter table functional.bucketed_table rename to bucketed_table_test");
+    AnalyzesOk("drop table functional.bucketed_table");
+
+    AnalysisError("alter table functional.bucketed_table add columns (a int)",
+        "functional.bucketed_table is a bucketed table. " +
+        "Only read operations are supported on such tables.");
+    AnalysisError("alter table functional.bucketed_table change col1 default bigint",
+        "functional.bucketed_table is a bucketed table. " +
+        "Only read operations are supported on such tables.");
+    AnalysisError("alter table functional.bucketed_table replace columns (a int)",
+        "functional.bucketed_table is a bucketed table. " +
+        "Only read operations are supported on such tables.");
+    AnalysisError("alter table functional.bucketed_table drop col1",
+        "functional.bucketed_table is a bucketed table. " +
+        "Only read operations are supported on such tables.");
+  }
+
   @Test
   public void TestAlterView() {
     // View-definition references a table.
@@ -2823,6 +2842,38 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "column: 'd'");
   }
 
+  @Test
+  public void TestCreateBucketedTable() throws AnalysisException {
+    AnalyzesOk("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY(i) INTO 24 BUCKETS");
+    AnalyzesOk("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY(i) SORT BY (s) INTO 24 BUCKETS");
+
+    // Bucketed table not supported for Kudu and ICEBERG table
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO 24 BUCKETS STORED BY KUDU", "CLUSTERED BY not " +
+        "support fileformat: 'KUDU'");
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO 24 BUCKETS STORED BY ICEBERG",
+        "CLUSTERED BY not support fileformat: 'ICEBERG'");
+    // Bucketed columns must not contain partition column and don't duplicate
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "PARTITIONED BY(dt string) CLUSTERED BY (dt) INTO 24 BUCKETS",
+        "CLUSTERED BY column list must not contain partition column: 'dt'");
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i, i) INTO 24 BUCKETS",
+        "Duplicate column in CLUSTERED BY list: i");
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (a) INTO 24 BUCKETS",
+        "Could not find CLUSTERED BY column 'a' in table.");
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO 0 BUCKETS",
+        "Bucket's number must be greater than 0.");
+    AnalysisError("CREATE TABLE functional.bucket (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY () INTO 12 BUCKETS",
+        "Bucket columns must be not null.");
+  }
+
   @Test
   public void TestCreateAvroTest() {
     String alltypesSchemaLoc =
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 0fcae9d30..0882bfb64 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -932,6 +932,7 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalyzesOk("show column stats functional.bucketed_table");
     AnalyzesOk("create table test as select * from functional.bucketed_table");
     AnalyzesOk("compute stats functional.bucketed_table");
+    AnalyzesOk("drop table functional.bucketed_table");
 
     String errorMsgBucketed = "functional.bucketed_table " +
         "is a bucketed table. Only read operations are supported on such tables.";
@@ -959,7 +960,6 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalysisError("insert into functional.bucketed_table select * from " +
        "functional.bucketed_table", errorMsgBucketed);
     AnalysisError("create table test like functional.bucketed_table", errorMsgBucketed);
-    AnalysisError("drop table functional.bucketed_table", errorMsgBucketed);
     AnalysisError("truncate table functional.bucketed_table", errorMsgBucketed);
     AnalysisError("alter table functional.bucketed_table add columns(col3 int)",
         errorMsgBucketed);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 524041c4f..7f9e70eaf 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3060,6 +3060,32 @@ public class ParserTest extends FrontendTestBase {
     ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " +
         "STORED AS KUDU");
     ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU");
+
+    // Supported bucketed table
+    ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO 24 BUCKETS");
+    ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', a int, s string) " +
+        "CLUSTERED BY (i, a) INTO 24 BUCKETS");
+
+    ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "PARTITIONED BY(dt string) CLUSTERED BY (i) INTO 24 BUCKETS");
+    ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) SORT BY(s) INTO 24 BUCKETS");
+    ParsesOk("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "PARTITIONED BY(dt string) CLUSTERED BY (i) SORT BY (s) " +
+        "INTO 24 BUCKETS");
+
+    ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i)");
+    ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED INTO 24 BUCKETS ");
+    ParserError("CREATE TABLE (i int, s string) CLUSTERED INTO 24 BUCKETS");
+    ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO BUCKETS");
+    ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "PARTITIONED BY(dt string) CLUSTERED BY (i) INTO BUCKETS");
+    ParserError("CREATE TABLE bucketed_test (i int COMMENT 'hello', s string) " +
+        "CLUSTERED BY (i) INTO 12 BUCKETS SORT BY (s)");
   }
 
   @Test
@@ -4362,10 +4388,4 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("--test\nSELECT 1\n");
     ParsesOk("--test\nSELECT 1\n  ");
   }
-
-  @Test
-  public void TestCreateBucketedTable() {
-    ParserError("Create table bucketed_tbl(order_id int, order_name string)"
-            + "clustered by (order_id) into 5 buckets", "Syntax error");
-  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 865a72d20..0c053ce68 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -471,6 +471,24 @@ public class ToSqlTest extends FrontendTestBase {
         "SORT BY ZORDER ( int_col, id ) STORED AS TEXTFILE", true);
   }
 
+  @Test
+  public void TestCreateBucketTable() throws AnalysisException {
+    testToSql("create table bucketed_table ( a int ) " +
+        "CLUSTERED BY ( a ) into 24 buckets", "default",
+        "CREATE TABLE default.bucketed_table ( a INT ) " +
+        "CLUSTERED BY ( a ) INTO 24 BUCKETS STORED AS TEXTFILE", true);
+    testToSql("create table bucketed_table1 ( a int ) " +
+        "partitioned by ( dt string ) CLUSTERED BY ( a ) into 24 buckets", "default",
+        "CREATE TABLE default.bucketed_table1 ( a INT ) " +
+        "PARTITIONED BY ( dt STRING ) CLUSTERED BY ( a ) INTO 24 BUCKETS " +
+        "STORED AS TEXTFILE", true);
+    testToSql("create table bucketed_table2 ( a int, s string ) partitioned " +
+        "by ( dt string ) CLUSTERED BY ( a ) sort by (s) into 24 buckets",
+        "default", "CREATE TABLE default.bucketed_table2 " +
+        "( a INT, s STRING ) PARTITIONED BY ( dt STRING ) CLUSTERED BY ( a ) SORT BY " +
+        "LEXICAL ( s ) INTO 24 BUCKETS STORED AS TEXTFILE", true);
+  }
+
   @Test
   public void TestCreateView() throws AnalysisException {
     testToSql(
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table.test b/testdata/workloads/functional-query/queries/QueryTest/create-table.test
index 9b3ae0c68..e293849e3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table.test
@@ -309,3 +309,54 @@ describe formatted zsortbytest;
 ---- TYPES
 STRING,STRING,STRING
 ====
+---- QUERY
+# Create bucketed table
+create table bucketed_test (a int, b string) CLUSTERED BY (a) into 24 buckets;
+describe formatted bucketed_test;
+---- RESULTS: VERIFY_IS_NOT_IN
+'Num Buckets:        ','24                   ','NULL'
+'Bucket Columns:     ','[a]                  ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test t;
+---- RESULTS
+---- TYPES
+INT, STRING
+====
+---- QUERY
+# Create bucketed table
+create table bucketed_test2 (a int, b string) partitioned by(day string)
+CLUSTERED BY (a) into 24 buckets;
+describe formatted bucketed_test2;
+---- RESULTS: VERIFY_IS_NOT_IN
+'Num Buckets:        ','24                   ','NULL'
+'Bucket Columns:     ','[a]                  ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test2 t where day = '2022-09-29';
+---- RESULTS
+---- TYPES
+INT, STRING
+====
+---- QUERY
+# Create bucketed table
+create table bucketed_test3 (a int, b string) CLUSTERED BY (a) sort by (b) into 24 buckets;
+describe formatted bucketed_test3;
+---- RESULTS: VERIFY_IS_SUBSET
+'','bucketing_version   ','2                   '
+'','sort.columns        ','b                   '
+'Num Buckets:        ','24                  ','NULL'
+'Bucket Columns:     ','[a]                 ','NULL'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select a, b from bucketed_test3 t;
+---- RESULTS
+---- TYPES
+INT, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index c2485aafa..a5293bee8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -1012,3 +1012,24 @@ LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
  'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.merge.mode'='copy-on-write')
 ====
+---- CREATE_TABLE
+# Test create Bucketed Table
+CREATE TABLE bucketed_test (a int, b string) CLUSTERED BY (a) into 4 buckets;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.bucketed_test (a INT, b STRING)
+CLUSTERED BY (a) INTO 4 BUCKETS
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('bucketing_version'='2', 'external.table.purge'='TRUE')
+====
+---- CREATE_TABLE
+# Test create Bucketed Table
+CREATE TABLE bucketed_sort_test (a int, b string) CLUSTERED BY (a) sort by (b) into 4 buckets;
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.bucketed_sort_test (a INT, b STRING)
+CLUSTERED BY (a) SORT BY LEXICAL (b) INTO 4 BUCKETS
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('bucketing_version'='2', 'sort.columns'='b', 'sort.order'='LEXICAL',
+'external.table.purge'='TRUE')
+====
\ No newline at end of file
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index f7fabf155..f1d44627e 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -45,7 +45,7 @@ class TestShowCreateTable(ImpalaTestSuite):
                            "impala.events.catalogVersion", "uuid",
                            "current-schema", "snapshot-count", "default-partition-spec",
                            "current-snapshot-id", "current-snapshot-summary",
-                           "current-snapshot-timestamp-ms"]
+                           "current-snapshot-timestamp-ms", "sort.columns", "sort.order"]
 
   @classmethod
   def get_workload(self):