You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/16 20:47:50 UTC
[impala] 01/02: IMPALA-8593: Support table capabilities handling
with Hive 3
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 23855c8e623e192ead788e1602559af9b81582f0
Author: Yongzhi Chen <yc...@cloudera.com>
AuthorDate: Fri Jun 7 08:10:43 2019 -0400
IMPALA-8593: Support table capabilities handling with Hive 3
This patch adds a method to check if a table bucketed.
For Hive 3, integrates with HMS translation layer for
capabilities checks.
Implements methods ensureTableWriteSupported and
ensureTableReadSupported.
Set default capabilities for tables.
Tests:
Added unit tests to ParserTest and AnalyzerTest.
Added bucketed tables which are required by IMPALA-8439.
Ran core tests(Hive 2 and Hive 3)
ToDo:
Integrate checking bucketed tables capabilities and creating
error messages with HMS translation after Hive provides the
required functions.
Enable capabilities checking for Kudu tables.
When upgrade tables from non-acid to acid, the default
capabilities should be changed too. Currently, use the
workaround by explicitly setting tblproperties OBJCAPABILITIES
with the acid properties.
Change-Id: Ia08d01168660830b6e0d08b55a95eac129889cec
Reviewed-on: http://gerrit.cloudera.org:8080/13558
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/compat/MetastoreShim.java | 16 +++
.../org/apache/impala/compat/MetastoreShim.java | 114 ++++++++++++++++++++-
.../org/apache/impala/analysis/AlterTableStmt.java | 2 +-
.../java/org/apache/impala/analysis/Analyzer.java | 99 ++++++++++++++++++
.../org/apache/impala/analysis/BaseTableRef.java | 2 +-
.../impala/analysis/CreateTableLikeStmt.java | 1 +
.../impala/analysis/DropTableOrViewStmt.java | 6 +-
.../org/apache/impala/analysis/InsertStmt.java | 4 +-
.../org/apache/impala/analysis/LoadDataStmt.java | 2 +-
.../org/apache/impala/analysis/TruncateStmt.java | 2 +-
.../impala/catalog/local/DirectMetaProvider.java | 3 +
.../apache/impala/service/CatalogOpExecutor.java | 40 ++++++++
.../java/org/apache/impala/service/JniCatalog.java | 5 +
.../java/org/apache/impala/util/MetaStoreUtil.java | 9 ++
.../org/apache/impala/analysis/AnalyzerTest.java | 42 ++++++++
.../org/apache/impala/analysis/ParserTest.java | 6 ++
.../impala/testutil/CatalogServiceTestCatalog.java | 7 ++
.../functional/functional_schema_template.sql | 40 ++++++++
.../datasets/functional/schema_constraints.csv | 10 ++
tests/metadata/test_ddl.py | 17 ++-
tests/metadata/test_show_create_table.py | 2 +-
21 files changed, 418 insertions(+), 11 deletions(-)
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 2064a09..61ec3e5 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -284,6 +284,22 @@ public class MetastoreShim {
return -1L;
}
+ public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
+ throw new UnsupportedOperationException("hasTableCapability not supported");
+ }
+
+ public static String getTableAccessType(Table msTbl) {
+ throw new UnsupportedOperationException("getTableAccessType not supported");
+ }
+
+ public static void setTableAccessType(Table msTbl, byte accessType) {
+ throw new UnsupportedOperationException("setTableAccessType not supported");
+ }
+
+ public static void setHiveClientCapabilities() {
+ throw new UnsupportedOperationException("setHiveClientCapabilities not supported");
+ }
+
/**
* @return the shim version.
*/
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index abae8e5..db06ffe 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -17,17 +17,24 @@
package org.apache.impala.compat;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_NONE;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READONLY;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READWRITE;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_WRITEONLY;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_TABLE;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -55,6 +62,7 @@ import org.apache.impala.authorization.User;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.compat.HiveMetadataFormatUtils;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TMetadataOpRequest;
@@ -68,6 +76,20 @@ import com.google.common.base.Preconditions;
* between major versions of Hive. This implements the shimmed methods for Hive 3.
*/
public class MetastoreShim {
+ private static final String EXTWRITE = "EXTWRITE";
+ private static final String EXTREAD = "EXTREAD";
+ private static final String HIVEBUCKET2 = "HIVEBUCKET2";
+ private static final String HIVEFULLACIDREAD = "HIVEFULLACIDREAD";
+ private static final String HIVEFULLACIDWRITE = "HIVEFULLACIDWRITE";
+ private static final String HIVEMANAGEDINSERTREAD = "HIVEMANAGEDINSERTREAD";
+ private static final String HIVEMANAGEDINSERTWRITE = "HIVEMANAGEDINSERTWRITE";
+ private static final String HIVEMANAGESTATS = "HIVEMANAGESTATS";
+ // Materialized View
+ private static final String HIVEMQT = "HIVEMQT";
+ // Virtual View
+ private static final String HIVESQL = "HIVESQL";
+ private static final long MAJOR_VERSION = 3;
+ private static boolean capabilitiestSet_ = false;
/**
* Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
*/
@@ -423,9 +445,97 @@ public class MetastoreShim {
}
/**
- * @return the shim major version
+ * Set impala capabilities to hive client
+ * Impala supports:
+ * - external table read/write
+ * - insert-only Acid table read
+ * - virtual view read
+ * - materialized view read
+ */
+ public static synchronized void setHiveClientCapabilities() {
+ String hostName;
+ if (capabilitiestSet_) return;
+ try {
+ hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException ue) {
+ hostName = "unknown";
+ }
+ String buildVersion = BackendConfig.INSTANCE != null ?
+ BackendConfig.INSTANCE.getImpalaBuildVersion() : String.valueOf(MAJOR_VERSION);
+ if (buildVersion == null) buildVersion = String.valueOf(MAJOR_VERSION);
+
+ // TODO: Add HIVEMANAGEDINSERTWRITE once IMPALA-8636 goes in.
+ String impalaId = String.format("Impala%s@%s", buildVersion, hostName);
+ String[] capabilities = new String[] {
+ EXTWRITE, // External table write
+ EXTREAD, // External table read
+ HIVEMANAGEDINSERTREAD,
+ HIVESQL,
+ HIVEMQT,
+ HIVEBUCKET2 // Includes the capability to get the correct bucket number.
+ // Currently, without this capability, for an external bucketed
+ // table, Hive will return the table as Read-only with bucket
+ // number -1. It makes clients unable to know it is a bucketed table.
+ // TODO: will remove this capability when Hive can provide
+ // API calls to tell the changing of bucket number.
+ };
+
+ HiveMetaStoreClient.setProcessorIdentifier(impalaId);
+ HiveMetaStoreClient.setProcessorCapabilities(capabilities);
+ capabilitiestSet_ = true;
+ }
+
+ /**
+ * Check if a table has a capability
+ * @param msTble hms table
+ * @param requireCapability hive access types or combination of them
+ * @return true if the table has the capability
+ */
+ public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
+ Preconditions.checkNotNull(msTbl);
+ // access types in binary:
+ // ACCESSTYPE_NONE: 00000001
+ // ACCESSTYPE_READONLY: 00000010
+ // ACCESSTYPE_WRITEONLY: 00000100
+ // ACCESSTYPE_READWRITE: 00001000
+ return requiredCapability != ACCESSTYPE_NONE
+ && ((msTbl.getAccessType() & requiredCapability) != 0);
+ }
+
+ /**
+ * Get Access type in string
+ * @param msTble hms table
+ * @return the string represents the table access type.
+ */
+ public static String getTableAccessType(Table msTbl) {
+ Preconditions.checkNotNull(msTbl);
+ switch (msTbl.getAccessType()) {
+ case ACCESSTYPE_READONLY:
+ return "READONLY";
+ case ACCESSTYPE_WRITEONLY:
+ return "WRITEONLY";
+ case ACCESSTYPE_READWRITE:
+ return "READWRITE";
+ case ACCESSTYPE_NONE:
+ default:
+ return "NONE";
+ }
+ }
+
+ /**
+ * Set table access type. This is useful for hms Table object constructed for create
+ * table statement. For example, to create a table, we need Read/Write capabilities
+ * not default 0(not defined)
+ */
+ public static void setTableAccessType(Table msTbl, byte accessType) {
+ Preconditions.checkNotNull(msTbl);
+ msTbl.setAccessType(accessType);
+ }
+
+ /**
+ * @return the hive major version
*/
public static long getMajorVersion() {
- return 3;
+ return MAJOR_VERSION;
}
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
index 1a31195..c023df7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
@@ -90,7 +90,7 @@ public abstract class AlterTableStmt extends StatementBase {
}
Preconditions.checkState(tableRef instanceof BaseTableRef);
table_ = tableRef.getTable();
- analyzer.ensureTableNotTransactional(table_);
+ analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
if (table_ instanceof FeDataSourceTable
&& !(this instanceof AlterTableSetColumnStats)) {
throw new AnalysisException(String.format(
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 6aa29d9..f0656ec 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -51,8 +51,10 @@ import org.apache.impala.catalog.FeIncompleteTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.ImpalaException;
@@ -85,6 +87,7 @@ import org.apache.impala.util.Graph.SccCondensedGraph;
import org.apache.impala.util.Graph.WritableGraph;
import org.apache.impala.util.IntIterator;
import org.apache.impala.util.ListMap;
+import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.TSessionStateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,6 +127,9 @@ import com.google.common.collect.Sets;
* more accurately and consistently here and elsewhere.
*/
public class Analyzer {
+ public static final byte ACCESSTYPE_READ = (byte)2;
+ public static final byte ACCESSTYPE_WRITE = (byte)4;
+ public static final byte ACCESSTYPE_READWRITE = (byte)8;
// Common analysis error messages
public final static String DB_DOES_NOT_EXIST_ERROR_MSG = "Database does not exist: ";
public final static String DB_ALREADY_EXISTS_ERROR_MSG = "Database already exists: ";
@@ -141,6 +147,10 @@ public class Analyzer {
private static final String TRANSACTIONAL_TABLE_NOT_SUPPORTED =
"Table %s not supported. Transactional (ACID) tables are " +
"only supported for read.";
+ private static final String BUCKETED_TABLE_NOT_SUPPORTED =
+ "%s is a bucketed table. Only read operations are supported on such tables.";
+ private static final String TABLE_NOT_SUPPORTED =
+ "%s not supported. Table %s access type is: %s";
private final static Logger LOG = LoggerFactory.getLogger(Analyzer.class);
@@ -185,6 +195,13 @@ public class Analyzer {
// except its own. Therefore, only a single semi-joined tuple can be visible at a time.
private TupleId visibleSemiJoinedTupleId_ = null;
+ // Required Operation type: Read, write, any(read or write).
+ public enum OperationType {
+ READ,
+ WRITE,
+ ANY
+ };
+
public void setIsSubquery() {
isSubquery_ = true;
globalState_.containsSubquery = true;
@@ -227,6 +244,88 @@ public class Analyzer {
}
}
+ /**
+ * @param table Table need to be checked
+ * @throws AnalysisException If table is a bucketed table.
+ */
+ public static void ensureTableNotBucketed(FeTable table)
+ throws AnalysisException {
+ if (MetaStoreUtil.isBucketedTable(table.getMetaStoreTable())) {
+ throw new AnalysisException(String.format(BUCKETED_TABLE_NOT_SUPPORTED,
+ table.getFullName()));
+ }
+ }
+
+ /**
+ * Check if the table supports the operation
+ * @param table Table need to check
+ * @param operationType The type of operation
+ * @throws AnalysisException If the table does not support the operation
+ */
+ public static void checkTableCapability(FeTable table, OperationType type)
+ throws AnalysisException {
+ switch(type) {
+ case WRITE:
+ ensureTableWriteSupported(table);
+ break;
+ case READ:
+ case ANY:
+ default:
+ ensureTableSupported(table);
+ break;
+ }
+ }
+
+ /**
+ * Check if the table supports write operations
+ * @param table Table need to check
+ * @throws AnalysisException If the table does not support write.
+ */
+ private static void ensureTableWriteSupported(FeTable table)
+ throws AnalysisException {
+ ensureTableNotBucketed(table);
+ if (MetastoreShim.getMajorVersion() > 2) {
+ byte writeRequires = ACCESSTYPE_WRITE | ACCESSTYPE_READWRITE;
+ // Kudu tables do not put new table properties to HMS and HMS need
+ // OBJCAPABILIES to grant managed unacid table write permission
+ // TODO: remove following kudu check when these issues are fixed
+ if (KuduTable.isKuduTable(table.getMetaStoreTable())) return;
+ if (!MetastoreShim.hasTableCapability(table.getMetaStoreTable(), writeRequires)) {
+ // Error messages with explanations.
+ ensureTableNotTransactional(table);
+ throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Write",
+ table.getFullName(),
+ MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
+ }
+ } else {
+ ensureTableNotTransactional(table);
+ }
+ }
+
+ /**
+ * Check if the table type is supported
+ * @param table Table need to check capabilities
+ * @throws AnalysisException if the table type is not supported.
+ */
+ private static void ensureTableSupported(FeTable table)
+ throws AnalysisException {
+ if (MetastoreShim.getMajorVersion() > 2) {
+ byte capabilities = ACCESSTYPE_READ | ACCESSTYPE_WRITE | ACCESSTYPE_READWRITE;
+ if (!MetastoreShim.hasTableCapability(table.getMetaStoreTable(), capabilities)) {
+ // Return error messages by table type checking
+ // TODO: After Hive provides API calls to send back hints on why
+ // the operations are not supported, we will generate error messages
+ // accordingly.
+ ensureTableNotFullAcid(table);
+ throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Operations",
+ table.getFullName(),
+ MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
+ }
+ } else {
+ ensureTableNotTransactional(table);
+ }
+ }
+
// State shared between all objects of an Analyzer tree. We use LinkedHashMap and
// LinkedHashSet where applicable to preserve the iteration order and make the class
// behave identical across different implementations of the JVM.
diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
index ea031a6..4c5fca9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
@@ -64,7 +64,7 @@ public class BaseTableRef extends TableRef {
requireGrantOption_);
desc_ = analyzer.registerTableRef(this);
isAnalyzed_ = true;
- analyzer.ensureTableNotFullAcid(getTable());
+ analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY);
analyzeTableSample(analyzer);
analyzeHints(analyzer);
analyzeJoin(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
index 4eec464..4a53e57 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -166,6 +166,7 @@ public class CreateTableLikeStmt extends StatementBase {
FeTable srcTable = analyzer.getTable(srcTableName_, Privilege.VIEW_METADATA);
analyzer.ensureTableNotFullAcid(srcTable);
+ analyzer.ensureTableNotBucketed(srcTable);
if (KuduTable.isKuduTable(srcTable.getMetaStoreTable())) {
throw new AnalysisException("Cloning a Kudu table using CREATE TABLE LIKE is " +
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index be083a4..74b3019 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -110,7 +110,6 @@ public class DropTableOrViewStmt extends StatementBase {
FeTable table = analyzer.getTable(tableName_, /* add access event */ true,
/* add column-level privilege */ false, Privilege.DROP);
Preconditions.checkNotNull(table);
- analyzer.ensureTableNotTransactional(table);
if (table instanceof FeView && dropTable_) {
throw new AnalysisException(String.format(
"DROP TABLE not allowed on a view: %s.%s", dbName_, getTbl()));
@@ -119,6 +118,11 @@ public class DropTableOrViewStmt extends StatementBase {
throw new AnalysisException(String.format(
"DROP VIEW not allowed on a table: %s.%s", dbName_, getTbl()));
}
+ if (dropTable_) {
+ // To drop a view needs not write capabilities, only checks for tables.
+ analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
+ }
+
} catch (TableLoadingException e) {
// We should still try to DROP tables that failed to load, so that tables that are
// in a bad state, eg. deleted externally from Kudu, can be dropped.
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 18bd264..f3f0f35 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -425,8 +425,6 @@ public class InsertStmt extends StatementBase {
.build());
}
- analyzer.ensureTableNotTransactional(table_);
-
// We do not support (in|up)serting into views.
if (table_ instanceof FeView) {
throw new AnalysisException(
@@ -434,6 +432,8 @@ public class InsertStmt extends StatementBase {
table_.getFullName()));
}
+ analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+
// We do not support (in|up)serting into tables with unsupported column types.
for (Column c: table_.getColumns()) {
if (!c.getType().isSupported()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 31614ca..4f3f90b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -109,7 +109,7 @@ public class LoadDataStmt extends StatementBase {
throw new AnalysisException("LOAD DATA only supported for HDFS tables: " +
dbName_ + "." + getTbl());
}
- analyzer.ensureTableNotTransactional(table);
+ analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
// Analyze the partition spec, if one was specified.
if (partitionSpec_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index 64a1e99..60d6bf2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -67,7 +67,7 @@ public class TruncateStmt extends StatementBase {
throw new AnalysisException(String.format(
"TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
}
- analyzer.ensureTableNotTransactional(table_);
+ analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 2aa3212..83a5ea8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -71,6 +71,9 @@ class DirectMetaProvider implements MetaProvider {
// TODO(todd): this should probably be a process-wide singleton.
if (msClientPool_ == null) {
TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+ if (MetastoreShim.getMajorVersion() > 2) {
+ MetastoreShim.setHiveClientCapabilities();
+ }
msClientPool_ = new MetaStoreClientPool(cfg.num_metadata_loading_threads,
cfg.initial_hms_cnxn_timeout_s);
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ae968a9..0a15915 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -17,6 +17,8 @@
package org.apache.impala.service;
+import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READWRITE;
+
import com.google.common.collect.Iterables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -87,6 +89,7 @@ import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
@@ -165,6 +168,7 @@ import org.apache.impala.thrift.TTruncateParams;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.util.CompressionUtil;
+import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.KuduUtil;
@@ -255,6 +259,14 @@ public class CatalogOpExecutor {
private final static String HMS_RPC_ERROR_FORMAT_STR =
"Error making '%s' RPC to Hive Metastore: ";
+ // Table capabilities property name
+ private static final String CAPABILITIES_KEY = "OBJCAPABILITIES";
+
+ // Table default capabilities
+ private static final String ACIDINSERTONLY_CAPABILITIES =
+ "HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE";
+ private static final String NONACID_CAPABILITIES = "EXTREAD,EXTWRITE";
+
// The maximum number of partitions to update in one Hive Metastore RPC.
// Used when persisting the results of COMPUTE STATS statements.
// It is also used as an upper limit for the number of partitions allowed in one ADD
@@ -1864,11 +1876,13 @@ public class CatalogOpExecutor {
tbl.setDbName(tableName.getDb());
tbl.setTableName(tableName.getTbl());
tbl.setOwner(params.getOwner());
+
if (params.isSetTable_properties()) {
tbl.setParameters(params.getTable_properties());
} else {
tbl.setParameters(new HashMap<String, String>());
}
+
if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
Joiner.on(",").join(params.sort_columns));
@@ -1890,6 +1904,8 @@ public class CatalogOpExecutor {
} else {
tbl.setPartitionKeys(new ArrayList<FieldSchema>());
}
+
+ setDefaultTableCapabilities(tbl);
return tbl;
}
@@ -2173,10 +2189,34 @@ public class CatalogOpExecutor {
}
// Set the row count of this table to unknown.
tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
+ setDefaultTableCapabilities(tbl);
LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
createTable(tbl, params.if_not_exists, null, params.server_name, response);
}
+ private static void setDefaultTableCapabilities(
+ org.apache.hadoop.hive.metastore.api.Table tbl) {
+ if (MetastoreShim.getMajorVersion() > 2) {
+ // This hms table is for create table.
+ // It needs read/write access type, not default value(0, undefined)
+ MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READWRITE);
+ // Set table default capabilities in HMS
+ if (tbl.getParameters().containsKey(CAPABILITIES_KEY)) return;
+ if (AcidUtils.isTransactionalTable(tbl.getParameters())) {
+ Preconditions.checkState(!AcidUtils.isFullAcidTable(tbl.getParameters()));
+ tbl.getParameters().put(CAPABILITIES_KEY, ACIDINSERTONLY_CAPABILITIES);
+ } else {
+ // Managed KUDU table has issues with extra table properties:
+ // 1. The property is not stored. 2. The table cannot be found after created.
+ // Related jira: IMPALA-8751
+ // Skip adding default capabilities for KUDU tables before the issues are fixed.
+ if (!KuduTable.isKuduTable(tbl)) {
+ tbl.getParameters().put(CAPABILITIES_KEY, NONACID_CAPABILITIES);
+ }
+ }
+ }
+ }
+
/**
* Sets the given params in the metastore table as appropriate for a
* create view operation.
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e7c5c9f..c23a446 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.FeDb;
import org.apache.impala.catalog.Function;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.JniUtil;
@@ -120,6 +121,10 @@ public class JniCatalog {
final AuthorizationConfig authzConfig = authzFactory.getAuthorizationConfig();
+ if (MetastoreShim.getMajorVersion() > 2) {
+ MetastoreShim.setHiveClientCapabilities();
+ }
+
catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
cfg.num_metadata_loading_threads, cfg.initial_hms_cnxn_timeout_s, getServiceId(),
cfg.local_library_path);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index a1e58c0..b0c182f 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.HdfsTable;
@@ -346,4 +347,12 @@ public class MetaStoreUtil {
msClient.fireListenerEvent(rqst);
}
+
+ /**
+ * Check if the hms table is a bucketed table or not
+ */
+ public static boolean isBucketedTable(Table msTbl) {
+ Preconditions.checkNotNull(msTbl);
+ return msTbl.getSd().getNumBuckets() > 0;
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index d5f42f8..8e3993c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -877,4 +877,46 @@ public class AnalyzerTest extends FrontendTestBase {
FunctionUtils.resolveFunction(Arrays.asList(fns), fnDate,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF));
}
+
+ @Test
+ public void testAnalyzeBucketed() {
+ AnalyzesOk("select count(*) from functional.bucketed_table");
+ AnalyzesOk("select count(*) from functional.bucketed_ext_table");
+ AnalyzesOk("drop stats functional.bucketed_table");
+ AnalyzesOk("describe functional.bucketed_table");
+ AnalyzesOk("show column stats functional.bucketed_table");
+ AnalyzesOk("create table test as select * from functional.bucketed_table");
+ AnalyzesOk("compute stats functional.bucketed_table");
+
+ String errorMsgBucketed = "functional.bucketed_table " +
+ "is a bucketed table. Only read operations are supported on such tables.";
+ String errorMsgExtBucketed = "functional.bucketed_ext_table " +
+ "is a bucketed table. Only read operations are supported on such tables.";
+ String errorMsgInsertOnlyBucketed =
+ "functional.insert_only_transactional_bucketed_table " +
+ "is a bucketed table. Only read operations are supported on such tables.";
+ String errorMsg = "Table bucketed_ext_table write not supported";
+
+ if (MetastoreShim.getMajorVersion() > 2) {
+ AnalyzesOk(
+ "select count(*) from functional.insert_only_transactional_bucketed_table");
+ AnalysisError("insert into functional.insert_only_transactional_bucketed_table " +
+ "select * from functional.insert_only_transactional_bucketed_table",
+ errorMsgInsertOnlyBucketed);
+ // Separates from Hive 2 as the error message may different after Hive
+ // provides error message needed information.
+ AnalysisError("insert into functional.bucketed_ext_table select * from " +
+ "functional.bucketed_ext_table", errorMsgExtBucketed);
+ } else {
+ AnalysisError("insert into functional.bucketed_ext_table select * from " +
+ "functional.bucketed_ext_table", errorMsgExtBucketed);
+ }
+ AnalysisError("insert into functional.bucketed_table select * from " +
+ "functional.bucketed_table", errorMsgBucketed);
+ AnalysisError("create table test like functional.bucketed_table", errorMsgBucketed);
+ AnalysisError("drop table functional.bucketed_table", errorMsgBucketed);
+ AnalysisError("truncate table functional.bucketed_table", errorMsgBucketed);
+ AnalysisError("alter table functional.bucketed_table add columns(col3 int)",
+ errorMsgBucketed);
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index d63bb7f..f67df17 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -4015,4 +4015,10 @@ public class ParserTest extends FrontendTestBase {
ParsesOk("--test\nSELECT 1\n");
ParsesOk("--test\nSELECT 1\n ");
}
+
+ @Test
+ public void TestCreateBucketedTable() {
+ ParserError("Create table bucketed_tbl(order_id int, order_name string)"
+ + "clustered by (order_id) into 5 buckets", "Syntax error");
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 124f6da..637a114 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -23,6 +23,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TUniqueId;
@@ -61,6 +62,9 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
FeSupport.loadLibrary();
CatalogServiceCatalog cs;
try {
+ if (MetastoreShim.getMajorVersion() > 2) {
+ MetastoreShim.setHiveClientCapabilities();
+ }
cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
new MetaStoreClientPool(0, 0));
cs.setAuthzManager(authzFactory.newAuthorizationManager(cs));
@@ -81,6 +85,9 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
FeSupport.loadLibrary();
Path derbyPath = Paths.get(System.getProperty("java.io.tmpdir"),
UUID.randomUUID().toString());
+ if (MetastoreShim.getMajorVersion() > 2) {
+ MetastoreShim.setHiveClientCapabilities();
+ }
CatalogServiceCatalog cs = new CatalogServiceTestCatalog(false, 16,
new TUniqueId(), new EmbeddedMetastoreClientPool(0, derbyPath));
cs.setAuthzManager(new NoopAuthorizationManager());
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 79d551d..d460eaa 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2162,6 +2162,46 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}{db_suffix}.{table_name}
---- DATASET
functional
---- BASE_TABLE_NAME
+insert_only_transactional_bucketed_table
+---- HIVE_MAJOR_VERSION
+3
+---- CREATE_HIVE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+ col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS ORC
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only');
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+bucketed_ext_table
+---- CREATE_HIVE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+ col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS {file_format}
+LOCATION '/test-warehouse/{db_name}{db_suffix}{table_name}';
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+bucketed_table
+---- CREATE_HIVE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+ col1 int, col2 int)
+CLUSTERED BY (col1) INTO 5 BUCKETS
+STORED AS {file_format};
+---- LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, int_col from functional.alltypes;
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT * from functional.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
testescape_16_lf
---- CREATE
CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 0546c73..0c156c9 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -254,3 +254,13 @@ table_name:insert_only_transactional_table, constraint:exclude, table_format:kud
# excluded.
table_name:materialized_view, constraint:exclude, table_format:hbase/none/none
table_name:materialized_view, constraint:exclude, table_format:kudu/none/none
+
+table_name:insert_only_transactional_bucketed_table, constraint:exclude, table_format:hbase/none/none
+table_name:insert_only_transactional_bucketed_table, constraint:exclude, table_format:kudu/none/none
+
+# Bucketed tables only work for file-format based tables
+table_name:bucketed_ext_table, constraint:exclude, table_format:hbase/none/none
+table_name:bucketed_ext_table, constraint:exclude, table_format:kudu/none/none
+table_name:bucketed_table, constraint:exclude, table_format:hbase/none/none
+table_name:bucketed_table, constraint:exclude, table_format:kudu/none/none
+table_name:bucketed_table, constraint:exclude, table_format:text/lzo/block
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index dce0b69..e5dfc25 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -23,10 +23,11 @@ import time
from test_ddl_base import TestDdlBase
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.environ import (HIVE_MAJOR_VERSION)
from tests.common.impala_test_suite import LOG
from tests.common.parametrize import UniqueDatabase
from tests.common.skip import (SkipIf, SkipIfABFS, SkipIfADLS, SkipIfKudu, SkipIfLocal,
- SkipIfCatalogV2)
+ SkipIfCatalogV2, SkipIfHive2)
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.util.filesystem_utils import (
WAREHOUSE,
@@ -665,6 +666,9 @@ class TestDdlStatements(TestDdlBase):
tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name))
properties = self._get_tbl_properties(fq_tbl_name)
+ if HIVE_MAJOR_VERSION > 2:
+ assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE'
+ del properties['OBJCAPABILITIES']
assert len(properties) == 2
# The transient_lastDdlTime is variable, so don't verify the value.
assert 'transient_lastDdlTime' in properties
@@ -685,12 +689,23 @@ class TestDdlStatements(TestDdlBase):
"('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name))
properties = self._get_tbl_properties(fq_tbl_name)
+ if HIVE_MAJOR_VERSION > 2:
+ assert 'OBJCAPABILITIES' in properties
assert 'transient_lastDdlTime' in properties
assert properties['p1'] == 'v1'
assert properties['prop1'] == 'val1'
assert properties['p2'] == 'val3'
assert properties[''] == ''
+ @SkipIfHive2.acid
+ def test_create_insertonly_tbl(self, vector, unique_database):
+ insertonly_tbl = unique_database + ".test_insertonly"
+ self.client.execute("""create table {0} (coli int) stored as parquet tblproperties(
+ 'transactional'='true', 'transactional_properties'='insert_only')"""
+ .format(insertonly_tbl))
+ properties = self._get_tbl_properties(insertonly_tbl)
+ assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE'
+
def test_alter_tbl_properties_reload(self, vector, unique_database):
# IMPALA-8734: Force a table schema reload when setting table properties.
tbl_name = "test_tbl"
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index 3ce2748..2a3079c 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -36,7 +36,7 @@ class TestShowCreateTable(ImpalaTestSuite):
"numRows", "rawDataSize", "totalSize", "COLUMN_STATS_ACCURATE",
"STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
"last_modified_time", "numFilesErasureCoded",
- "bucketing_version"]
+ "bucketing_version", "OBJCAPABILITIES"]
@classmethod
def get_workload(self):