You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/16 20:47:50 UTC

[impala] 01/02: IMPALA-8593: Support table capabilities handling with Hive 3

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 23855c8e623e192ead788e1602559af9b81582f0
Author: Yongzhi Chen <yc...@cloudera.com>
AuthorDate: Fri Jun 7 08:10:43 2019 -0400

    IMPALA-8593: Support table capabilities handling with Hive 3
    
    This patch adds a method to check if a table bucketed.
    For Hive 3, integrates with HMS translation layer for
    capabilities checks.
    Implements methods ensureTableWriteSupported and
    ensureTableReadSupported.
    Set default capabilities for tables.
    
    Tests:
    Added unit tests to ParserTest and AnalyzerTest.
    Added bucketed tables which are required by IMPALA-8439.
    Ran core tests(Hive 2 and Hive 3)
    
    ToDo:
    Integrate checking bucketed tables capabilities and creating
    error messages with HMS translation after Hive provides the
    required functions.
    Enable capabilities checking for Kudu tables.
    When upgrade tables from non-acid to acid, the default
    capabilities should be changed too. Currently, use the
    workaround by explicitly setting tblproperties OBJCAPABILITIES
    with the acid properties.
    
    Change-Id: Ia08d01168660830b6e0d08b55a95eac129889cec
    Reviewed-on: http://gerrit.cloudera.org:8080/13558
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  16 +++
 .../org/apache/impala/compat/MetastoreShim.java    | 114 ++++++++++++++++++++-
 .../org/apache/impala/analysis/AlterTableStmt.java |   2 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  99 ++++++++++++++++++
 .../org/apache/impala/analysis/BaseTableRef.java   |   2 +-
 .../impala/analysis/CreateTableLikeStmt.java       |   1 +
 .../impala/analysis/DropTableOrViewStmt.java       |   6 +-
 .../org/apache/impala/analysis/InsertStmt.java     |   4 +-
 .../org/apache/impala/analysis/LoadDataStmt.java   |   2 +-
 .../org/apache/impala/analysis/TruncateStmt.java   |   2 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   3 +
 .../apache/impala/service/CatalogOpExecutor.java   |  40 ++++++++
 .../java/org/apache/impala/service/JniCatalog.java |   5 +
 .../java/org/apache/impala/util/MetaStoreUtil.java |   9 ++
 .../org/apache/impala/analysis/AnalyzerTest.java   |  42 ++++++++
 .../org/apache/impala/analysis/ParserTest.java     |   6 ++
 .../impala/testutil/CatalogServiceTestCatalog.java |   7 ++
 .../functional/functional_schema_template.sql      |  40 ++++++++
 .../datasets/functional/schema_constraints.csv     |  10 ++
 tests/metadata/test_ddl.py                         |  17 ++-
 tests/metadata/test_show_create_table.py           |   2 +-
 21 files changed, 418 insertions(+), 11 deletions(-)

diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 2064a09..61ec3e5 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -284,6 +284,22 @@ public class MetastoreShim {
     return -1L;
   }
 
+  public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
+    throw new UnsupportedOperationException("hasTableCapability not supported");
+  }
+
+  public static String getTableAccessType(Table msTbl) {
+    throw new UnsupportedOperationException("getTableAccessType not supported");
+  }
+
+  public static void setTableAccessType(Table msTbl, byte accessType) {
+    throw new UnsupportedOperationException("setTableAccessType not supported");
+  }
+
+  public static void setHiveClientCapabilities() {
+    throw new UnsupportedOperationException("setHiveClientCapabilities not supported");
+  }
+
   /**
    * @return the shim version.
    */
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index abae8e5..db06ffe 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -17,17 +17,24 @@
 
 package org.apache.impala.compat;
 
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_NONE;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READONLY;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READWRITE;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_WRITEONLY;
 import static org.apache.impala.service.MetadataOp.TABLE_TYPE_TABLE;
 import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -55,6 +62,7 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.compat.HiveMetadataFormatUtils;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
@@ -68,6 +76,20 @@ import com.google.common.base.Preconditions;
  * between major versions of Hive. This implements the shimmed methods for Hive 3.
  */
 public class MetastoreShim {
+  private static final String EXTWRITE = "EXTWRITE";
+  private static final String EXTREAD = "EXTREAD";
+  private static final String HIVEBUCKET2 = "HIVEBUCKET2";
+  private static final String HIVEFULLACIDREAD = "HIVEFULLACIDREAD";
+  private static final String HIVEFULLACIDWRITE = "HIVEFULLACIDWRITE";
+  private static final String HIVEMANAGEDINSERTREAD = "HIVEMANAGEDINSERTREAD";
+  private static final String HIVEMANAGEDINSERTWRITE = "HIVEMANAGEDINSERTWRITE";
+  private static final String HIVEMANAGESTATS = "HIVEMANAGESTATS";
+  // Materialized View
+  private static final String HIVEMQT = "HIVEMQT";
+  // Virtual View
+  private static final String HIVESQL = "HIVESQL";
+  private static final long MAJOR_VERSION = 3;
+  private static boolean capabilitiestSet_ = false;
   /**
    * Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
    */
@@ -423,9 +445,97 @@ public class MetastoreShim {
   }
 
   /**
-   * @return the shim major version
+   * Set impala capabilities to hive client
+   * Impala supports:
+   * - external table read/write
+   * - insert-only Acid table read
+   * - virtual view read
+   * - materialized view read
+   */
+  public static synchronized void setHiveClientCapabilities() {
+    String hostName;
+    if (capabilitiestSet_) return;
+    try {
+      hostName = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException ue) {
+      hostName = "unknown";
+    }
+    String buildVersion = BackendConfig.INSTANCE != null ?
+        BackendConfig.INSTANCE.getImpalaBuildVersion() : String.valueOf(MAJOR_VERSION);
+    if (buildVersion == null) buildVersion = String.valueOf(MAJOR_VERSION);
+
+    // TODO: Add HIVEMANAGEDINSERTWRITE once IMPALA-8636 goes in.
+    String impalaId = String.format("Impala%s@%s", buildVersion, hostName);
+    String[] capabilities = new String[] {
+        EXTWRITE, // External table write
+        EXTREAD,  // External table read
+        HIVEMANAGEDINSERTREAD,
+        HIVESQL,
+        HIVEMQT,
+        HIVEBUCKET2 // Includes the capability to get the correct bucket number.
+                    // Currently, without this capability, for an external bucketed
+                    // table, Hive will return the table as Read-only with bucket
+                    // number -1. It makes clients unable to know it is a bucketed table.
+                    // TODO: will remove this capability when Hive can provide
+                    // API calls to tell the changing of bucket number.
+        };
+
+    HiveMetaStoreClient.setProcessorIdentifier(impalaId);
+    HiveMetaStoreClient.setProcessorCapabilities(capabilities);
+    capabilitiestSet_ = true;
+  }
+
+  /**
+   * Check if a table has a capability
+   * @param msTble hms table
+   * @param requireCapability hive access types or combination of them
+   * @return true if the table has the capability
+   */
+  public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
+    Preconditions.checkNotNull(msTbl);
+    // access types in binary:
+    // ACCESSTYPE_NONE:      00000001
+    // ACCESSTYPE_READONLY:  00000010
+    // ACCESSTYPE_WRITEONLY: 00000100
+    // ACCESSTYPE_READWRITE: 00001000
+    return requiredCapability != ACCESSTYPE_NONE
+        && ((msTbl.getAccessType() & requiredCapability) != 0);
+  }
+
+  /**
+   * Get Access type in string
+   * @param msTble hms table
+   * @return the string represents the table access type.
+   */
+  public static String getTableAccessType(Table msTbl) {
+    Preconditions.checkNotNull(msTbl);
+    switch (msTbl.getAccessType()) {
+      case ACCESSTYPE_READONLY:
+        return "READONLY";
+      case ACCESSTYPE_WRITEONLY:
+        return "WRITEONLY";
+      case ACCESSTYPE_READWRITE:
+        return "READWRITE";
+      case ACCESSTYPE_NONE:
+      default:
+        return "NONE";
+    }
+  }
+
+  /**
+   * Set table access type. This is useful for hms Table object constructed for create
+   * table statement. For example, to create a table, we need Read/Write capabilities
+   * not default 0(not defined)
+   */
+  public static void setTableAccessType(Table msTbl, byte accessType) {
+    Preconditions.checkNotNull(msTbl);
+    msTbl.setAccessType(accessType);
+  }
+
+  /**
+   * @return the hive major version
    */
   public static long getMajorVersion() {
-    return 3;
+    return MAJOR_VERSION;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
index 1a31195..c023df7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
@@ -90,7 +90,7 @@ public abstract class AlterTableStmt extends StatementBase {
     }
     Preconditions.checkState(tableRef instanceof BaseTableRef);
     table_ = tableRef.getTable();
-    analyzer.ensureTableNotTransactional(table_);
+    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
     if (table_ instanceof FeDataSourceTable
         && !(this instanceof AlterTableSetColumnStats)) {
       throw new AnalysisException(String.format(
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 6aa29d9..f0656ec 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -51,8 +51,10 @@ import org.apache.impala.catalog.FeIncompleteTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ImpalaException;
@@ -85,6 +87,7 @@ import org.apache.impala.util.Graph.SccCondensedGraph;
 import org.apache.impala.util.Graph.WritableGraph;
 import org.apache.impala.util.IntIterator;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.TSessionStateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,6 +127,9 @@ import com.google.common.collect.Sets;
  * more accurately and consistently here and elsewhere.
  */
 public class Analyzer {
+  public static final byte ACCESSTYPE_READ = (byte)2;
+  public static final byte ACCESSTYPE_WRITE = (byte)4;
+  public static final byte ACCESSTYPE_READWRITE = (byte)8;
   // Common analysis error messages
   public final static String DB_DOES_NOT_EXIST_ERROR_MSG = "Database does not exist: ";
   public final static String DB_ALREADY_EXISTS_ERROR_MSG = "Database already exists: ";
@@ -141,6 +147,10 @@ public class Analyzer {
   private static final String TRANSACTIONAL_TABLE_NOT_SUPPORTED =
       "Table %s not supported. Transactional (ACID) tables are " +
       "only supported for read.";
+  private static final String BUCKETED_TABLE_NOT_SUPPORTED =
+      "%s is a bucketed table. Only read operations are supported on such tables.";
+  private static final String TABLE_NOT_SUPPORTED =
+      "%s not supported. Table %s  access type is: %s";
 
   private final static Logger LOG = LoggerFactory.getLogger(Analyzer.class);
 
@@ -185,6 +195,13 @@ public class Analyzer {
   // except its own. Therefore, only a single semi-joined tuple can be visible at a time.
   private TupleId visibleSemiJoinedTupleId_ = null;
 
+  // Required Operation type: Read, write, any(read or write).
+  public enum OperationType {
+    READ,
+    WRITE,
+    ANY
+  };
+
   public void setIsSubquery() {
     isSubquery_ = true;
     globalState_.containsSubquery = true;
@@ -227,6 +244,88 @@ public class Analyzer {
     }
   }
 
+  /**
+   * @param table Table need to be checked
+   * @throws AnalysisException If table is a bucketed table.
+   */
+  public static void ensureTableNotBucketed(FeTable table)
+      throws AnalysisException {
+    if (MetaStoreUtil.isBucketedTable(table.getMetaStoreTable())) {
+      throw new AnalysisException(String.format(BUCKETED_TABLE_NOT_SUPPORTED,
+              table.getFullName()));
+    }
+  }
+
+  /**
+   * Check if the table supports the operation
+   * @param table Table need to check
+   * @param operationType The type of operation
+   * @throws AnalysisException If the table does not support the operation
+   */
+  public static void checkTableCapability(FeTable table, OperationType type)
+      throws AnalysisException {
+    switch(type) {
+      case WRITE:
+        ensureTableWriteSupported(table);
+        break;
+      case READ:
+      case ANY:
+      default:
+        ensureTableSupported(table);
+        break;
+    }
+  }
+
+  /**
+   * Check if the table supports write operations
+   * @param table Table need to check
+   * @throws AnalysisException If the table does not support write.
+   */
+  private static void ensureTableWriteSupported(FeTable table)
+      throws AnalysisException {
+    ensureTableNotBucketed(table);
+    if (MetastoreShim.getMajorVersion() > 2) {
+      byte writeRequires = ACCESSTYPE_WRITE | ACCESSTYPE_READWRITE;
+      // Kudu tables do not put new table properties to HMS and HMS need
+      // OBJCAPABILIES to grant managed unacid table write permission
+      // TODO: remove following kudu check when these issues are fixed
+      if (KuduTable.isKuduTable(table.getMetaStoreTable())) return;
+      if (!MetastoreShim.hasTableCapability(table.getMetaStoreTable(), writeRequires)) {
+        // Error messages with explanations.
+        ensureTableNotTransactional(table);
+        throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Write",
+            table.getFullName(),
+            MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
+      }
+    } else {
+      ensureTableNotTransactional(table);
+    }
+  }
+
+  /**
+   * Check if the table type is supported
+   * @param table Table need to check capabilities
+   * @throws AnalysisException if the table type is not supported.
+   */
+  private static void ensureTableSupported(FeTable table)
+      throws AnalysisException {
+    if (MetastoreShim.getMajorVersion() > 2) {
+      byte capabilities = ACCESSTYPE_READ | ACCESSTYPE_WRITE | ACCESSTYPE_READWRITE;
+      if (!MetastoreShim.hasTableCapability(table.getMetaStoreTable(), capabilities)) {
+        // Return error messages by table type checking
+        // TODO: After Hive provides API calls to send back hints on why
+        // the operations are not supported, we will generate error messages
+        // accordingly.
+        ensureTableNotFullAcid(table);
+        throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Operations",
+            table.getFullName(),
+            MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
+      }
+    } else {
+      ensureTableNotTransactional(table);
+    }
+  }
+
   // State shared between all objects of an Analyzer tree. We use LinkedHashMap and
   // LinkedHashSet where applicable to preserve the iteration order and make the class
   // behave identical across different implementations of the JVM.
diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
index ea031a6..4c5fca9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
@@ -64,7 +64,7 @@ public class BaseTableRef extends TableRef {
         requireGrantOption_);
     desc_ = analyzer.registerTableRef(this);
     isAnalyzed_ = true;
-    analyzer.ensureTableNotFullAcid(getTable());
+    analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY);
     analyzeTableSample(analyzer);
     analyzeHints(analyzer);
     analyzeJoin(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
index 4eec464..4a53e57 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -166,6 +166,7 @@ public class CreateTableLikeStmt extends StatementBase {
     FeTable srcTable = analyzer.getTable(srcTableName_, Privilege.VIEW_METADATA);
 
     analyzer.ensureTableNotFullAcid(srcTable);
+    analyzer.ensureTableNotBucketed(srcTable);
 
     if (KuduTable.isKuduTable(srcTable.getMetaStoreTable())) {
       throw new AnalysisException("Cloning a Kudu table using CREATE TABLE LIKE is " +
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index be083a4..74b3019 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -110,7 +110,6 @@ public class DropTableOrViewStmt extends StatementBase {
       FeTable table = analyzer.getTable(tableName_, /* add access event */ true,
           /* add column-level privilege */ false, Privilege.DROP);
       Preconditions.checkNotNull(table);
-      analyzer.ensureTableNotTransactional(table);
       if (table instanceof FeView && dropTable_) {
         throw new AnalysisException(String.format(
             "DROP TABLE not allowed on a view: %s.%s", dbName_, getTbl()));
@@ -119,6 +118,11 @@ public class DropTableOrViewStmt extends StatementBase {
         throw new AnalysisException(String.format(
             "DROP VIEW not allowed on a table: %s.%s", dbName_, getTbl()));
       }
+      if (dropTable_) {
+        // To drop a view needs not write capabilities, only checks for tables.
+        analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
+      }
+
     } catch (TableLoadingException e) {
       // We should still try to DROP tables that failed to load, so that tables that are
       // in a bad state, eg. deleted externally from Kudu, can be dropped.
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 18bd264..f3f0f35 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -425,8 +425,6 @@ public class InsertStmt extends StatementBase {
               .build());
     }
 
-    analyzer.ensureTableNotTransactional(table_);
-
     // We do not support (in|up)serting into views.
     if (table_ instanceof FeView) {
       throw new AnalysisException(
@@ -434,6 +432,8 @@ public class InsertStmt extends StatementBase {
               table_.getFullName()));
     }
 
+    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+
     // We do not support (in|up)serting into tables with unsupported column types.
     for (Column c: table_.getColumns()) {
       if (!c.getType().isSupported()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 31614ca..4f3f90b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -109,7 +109,7 @@ public class LoadDataStmt extends StatementBase {
       throw new AnalysisException("LOAD DATA only supported for HDFS tables: " +
           dbName_ + "." + getTbl());
     }
-    analyzer.ensureTableNotTransactional(table);
+    analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
 
     // Analyze the partition spec, if one was specified.
     if (partitionSpec_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index 64a1e99..60d6bf2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -67,7 +67,7 @@ public class TruncateStmt extends StatementBase {
       throw new AnalysisException(String.format(
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
     }
-    analyzer.ensureTableNotTransactional(table_);
+    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 2aa3212..83a5ea8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -71,6 +71,9 @@ class DirectMetaProvider implements MetaProvider {
     // TODO(todd): this should probably be a process-wide singleton.
     if (msClientPool_ == null) {
       TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+      if (MetastoreShim.getMajorVersion() > 2) {
+        MetastoreShim.setHiveClientCapabilities();
+      }
       msClientPool_ = new MetaStoreClientPool(cfg.num_metadata_loading_threads,
           cfg.initial_hms_cnxn_timeout_s);
     }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ae968a9..0a15915 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.service;
 
+import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READWRITE;
+
 import com.google.common.collect.Iterables;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -87,6 +89,7 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -165,6 +168,7 @@ import org.apache.impala.thrift.TTruncateParams;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.util.CompressionUtil;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.KuduUtil;
@@ -255,6 +259,14 @@ public class CatalogOpExecutor {
   private final static String HMS_RPC_ERROR_FORMAT_STR =
       "Error making '%s' RPC to Hive Metastore: ";
 
+  // Table capabilities property name
+  private static final String CAPABILITIES_KEY = "OBJCAPABILITIES";
+
+  // Table default capabilities
+  private static final String ACIDINSERTONLY_CAPABILITIES =
+      "HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE";
+  private static final String NONACID_CAPABILITIES = "EXTREAD,EXTWRITE";
+
   // The maximum number of partitions to update in one Hive Metastore RPC.
   // Used when persisting the results of COMPUTE STATS statements.
   // It is also used as an upper limit for the number of partitions allowed in one ADD
@@ -1864,11 +1876,13 @@ public class CatalogOpExecutor {
     tbl.setDbName(tableName.getDb());
     tbl.setTableName(tableName.getTbl());
     tbl.setOwner(params.getOwner());
+
     if (params.isSetTable_properties()) {
       tbl.setParameters(params.getTable_properties());
     } else {
       tbl.setParameters(new HashMap<String, String>());
     }
+
     if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
       tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
           Joiner.on(",").join(params.sort_columns));
@@ -1890,6 +1904,8 @@ public class CatalogOpExecutor {
     } else {
       tbl.setPartitionKeys(new ArrayList<FieldSchema>());
     }
+
+    setDefaultTableCapabilities(tbl);
     return tbl;
   }
 
@@ -2173,10 +2189,34 @@ public class CatalogOpExecutor {
     }
     // Set the row count of this table to unknown.
     tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
+    setDefaultTableCapabilities(tbl);
     LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
     createTable(tbl, params.if_not_exists, null, params.server_name, response);
   }
 
+  private static void setDefaultTableCapabilities(
+      org.apache.hadoop.hive.metastore.api.Table tbl) {
+    if (MetastoreShim.getMajorVersion() > 2) {
+      // This hms table is for create table.
+      // It needs read/write access type,  not default value(0, undefined)
+      MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READWRITE);
+      // Set table default capabilities in HMS
+      if (tbl.getParameters().containsKey(CAPABILITIES_KEY)) return;
+      if (AcidUtils.isTransactionalTable(tbl.getParameters())) {
+        Preconditions.checkState(!AcidUtils.isFullAcidTable(tbl.getParameters()));
+        tbl.getParameters().put(CAPABILITIES_KEY, ACIDINSERTONLY_CAPABILITIES);
+      } else {
+        // Managed KUDU table has issues with extra table properties:
+        // 1. The property is not stored. 2. The table cannot be found after created.
+        // Related jira: IMPALA-8751
+        // Skip adding default capabilities for KUDU tables before the issues are fixed.
+        if (!KuduTable.isKuduTable(tbl)) {
+          tbl.getParameters().put(CAPABILITIES_KEY, NONACID_CAPABILITIES);
+        }
+      }
+    }
+  }
+
   /**
    * Sets the given params in the metastore table as appropriate for a
    * create view operation.
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e7c5c9f..c23a446 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
@@ -120,6 +121,10 @@ public class JniCatalog {
 
     final AuthorizationConfig authzConfig = authzFactory.getAuthorizationConfig();
 
+    if (MetastoreShim.getMajorVersion() > 2) {
+      MetastoreShim.setHiveClientCapabilities();
+    }
+
     catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
         cfg.num_metadata_loading_threads, cfg.initial_hms_cnxn_timeout_s, getServiceId(),
         cfg.local_library_path);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index a1e58c0..b0c182f 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.HdfsTable;
@@ -346,4 +347,12 @@ public class MetaStoreUtil {
 
     msClient.fireListenerEvent(rqst);
   }
+
+  /**
+   * Check if the hms table is a bucketed table or not
+   */
+  public static boolean isBucketedTable(Table msTbl) {
+    Preconditions.checkNotNull(msTbl);
+    return msTbl.getSd().getNumBuckets() > 0;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index d5f42f8..8e3993c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -877,4 +877,46 @@ public class AnalyzerTest extends FrontendTestBase {
         FunctionUtils.resolveFunction(Arrays.asList(fns), fnDate,
             Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF));
   }
+
+  @Test
+  public void testAnalyzeBucketed() {
+    AnalyzesOk("select count(*) from functional.bucketed_table");
+    AnalyzesOk("select count(*) from functional.bucketed_ext_table");
+    AnalyzesOk("drop stats functional.bucketed_table");
+    AnalyzesOk("describe functional.bucketed_table");
+    AnalyzesOk("show column stats functional.bucketed_table");
+    AnalyzesOk("create table test as select * from functional.bucketed_table");
+    AnalyzesOk("compute stats functional.bucketed_table");
+
+    String errorMsgBucketed = "functional.bucketed_table " +
+        "is a bucketed table. Only read operations are supported on such tables.";
+    String errorMsgExtBucketed = "functional.bucketed_ext_table " +
+        "is a bucketed table. Only read operations are supported on such tables.";
+    String errorMsgInsertOnlyBucketed =
+        "functional.insert_only_transactional_bucketed_table " +
+        "is a bucketed table. Only read operations are supported on such tables.";
+    String errorMsg = "Table bucketed_ext_table write not supported";
+
+    if (MetastoreShim.getMajorVersion() > 2) {
+      AnalyzesOk(
+          "select count(*) from functional.insert_only_transactional_bucketed_table");
+      AnalysisError("insert into functional.insert_only_transactional_bucketed_table " +
+          "select * from functional.insert_only_transactional_bucketed_table",
+          errorMsgInsertOnlyBucketed);
+      // Separates from Hive 2 as the error message may different after Hive
+      // provides error message needed information.
+      AnalysisError("insert into functional.bucketed_ext_table select * from " +
+          "functional.bucketed_ext_table", errorMsgExtBucketed);
+    } else {
+      AnalysisError("insert into functional.bucketed_ext_table select * from " +
+         "functional.bucketed_ext_table", errorMsgExtBucketed);
+    }
+    AnalysisError("insert into functional.bucketed_table select * from " +
+       "functional.bucketed_table", errorMsgBucketed);
+    AnalysisError("create table test like functional.bucketed_table", errorMsgBucketed);
+    AnalysisError("drop table functional.bucketed_table", errorMsgBucketed);
+    AnalysisError("truncate table functional.bucketed_table", errorMsgBucketed);
+    AnalysisError("alter table functional.bucketed_table add columns(col3 int)",
+        errorMsgBucketed);
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index d63bb7f..f67df17 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -4015,4 +4015,10 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("--test\nSELECT 1\n");
     ParsesOk("--test\nSELECT 1\n  ");
   }
+
+  @Test
+  public void TestCreateBucketedTable() {
+    ParserError("Create table bucketed_tbl(order_id int, order_name string)"
+            + "clustered by (order_id) into 5 buckets", "Syntax error");
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 124f6da..637a114 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -23,6 +23,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TUniqueId;
@@ -61,6 +62,9 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     FeSupport.loadLibrary();
     CatalogServiceCatalog cs;
     try {
+      if (MetastoreShim.getMajorVersion() > 2) {
+        MetastoreShim.setHiveClientCapabilities();
+      }
       cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
           new MetaStoreClientPool(0, 0));
       cs.setAuthzManager(authzFactory.newAuthorizationManager(cs));
@@ -81,6 +85,9 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     FeSupport.loadLibrary();
     Path derbyPath = Paths.get(System.getProperty("java.io.tmpdir"),
         UUID.randomUUID().toString());
+    if (MetastoreShim.getMajorVersion() > 2) {
+      MetastoreShim.setHiveClientCapabilities();
+    }
     CatalogServiceCatalog cs = new CatalogServiceTestCatalog(false, 16,
         new TUniqueId(), new EmbeddedMetastoreClientPool(0, derbyPath));
     cs.setAuthzManager(new NoopAuthorizationManager());
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 79d551d..d460eaa 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2162,6 +2162,46 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+insert_only_transactional_bucketed_table
+---- HIVE_MAJOR_VERSION
+3
+---- CREATE_HIVE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+  col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS ORC
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only');
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+bucketed_ext_table
+---- CREATE_HIVE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+  col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS {file_format}
+LOCATION '/test-warehouse/{db_name}{db_suffix}{table_name}';
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+bucketed_table
+---- CREATE_HIVE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+  col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS {file_format};
+---- LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, int_col from functional.alltypes;
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT * from functional.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 testescape_16_lf
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 0546c73..0c156c9 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -254,3 +254,13 @@ table_name:insert_only_transactional_table, constraint:exclude, table_format:kud
 # excluded.
 table_name:materialized_view, constraint:exclude, table_format:hbase/none/none
 table_name:materialized_view, constraint:exclude, table_format:kudu/none/none
+
+table_name:insert_only_transactional_bucketed_table, constraint:exclude, table_format:hbase/none/none
+table_name:insert_only_transactional_bucketed_table, constraint:exclude, table_format:kudu/none/none
+
+# Bucketed tables only work for file-format based tables
+table_name:bucketed_ext_table, constraint:exclude, table_format:hbase/none/none
+table_name:bucketed_ext_table, constraint:exclude, table_format:kudu/none/none
+table_name:bucketed_table, constraint:exclude, table_format:hbase/none/none
+table_name:bucketed_table, constraint:exclude, table_format:kudu/none/none
+table_name:bucketed_table, constraint:exclude, table_format:text/lzo/block
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index dce0b69..e5dfc25 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -23,10 +23,11 @@ import time
 
 from test_ddl_base import TestDdlBase
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.environ import (HIVE_MAJOR_VERSION)
 from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import (SkipIf, SkipIfABFS, SkipIfADLS, SkipIfKudu, SkipIfLocal,
-                               SkipIfCatalogV2)
+                               SkipIfCatalogV2, SkipIfHive2)
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.util.filesystem_utils import (
     WAREHOUSE,
@@ -665,6 +666,9 @@ class TestDdlStatements(TestDdlBase):
     tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name))
     properties = self._get_tbl_properties(fq_tbl_name)
 
+    if HIVE_MAJOR_VERSION > 2:
+      assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE'
+      del properties['OBJCAPABILITIES']
     assert len(properties) == 2
     # The transient_lastDdlTime is variable, so don't verify the value.
     assert 'transient_lastDdlTime' in properties
@@ -685,12 +689,23 @@ class TestDdlStatements(TestDdlBase):
         "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name))
     properties = self._get_tbl_properties(fq_tbl_name)
 
+    if HIVE_MAJOR_VERSION > 2:
+      assert 'OBJCAPABILITIES' in properties
     assert 'transient_lastDdlTime' in properties
     assert properties['p1'] == 'v1'
     assert properties['prop1'] == 'val1'
     assert properties['p2'] == 'val3'
     assert properties[''] == ''
 
+  @SkipIfHive2.acid
+  def test_create_insertonly_tbl(self, vector, unique_database):
+    insertonly_tbl = unique_database + ".test_insertonly"
+    self.client.execute("""create table {0} (coli int) stored as parquet tblproperties(
+        'transactional'='true', 'transactional_properties'='insert_only')"""
+        .format(insertonly_tbl))
+    properties = self._get_tbl_properties(insertonly_tbl)
+    assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE'
+
   def test_alter_tbl_properties_reload(self, vector, unique_database):
     # IMPALA-8734: Force a table schema reload when setting table properties.
     tbl_name = "test_tbl"
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index 3ce2748..2a3079c 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -36,7 +36,7 @@ class TestShowCreateTable(ImpalaTestSuite):
                            "numRows", "rawDataSize", "totalSize", "COLUMN_STATS_ACCURATE",
                            "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
                            "last_modified_time", "numFilesErasureCoded",
-                           "bucketing_version"]
+                           "bucketing_version", "OBJCAPABILITIES"]
 
   @classmethod
   def get_workload(self):