You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/22 05:33:33 UTC
[06/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE
TABLE statements with Kudu tables
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index d55f8da..d0185b7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -19,21 +19,19 @@ package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import javax.xml.bind.DatatypeConverter;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.log4j.Logger;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.LocatedTablet;
-
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.DistributeParam;
+import org.apache.impala.analysis.ToSqlUtils;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TDistributeByHashParam;
+import org.apache.impala.thrift.TDistributeByRangeParam;
+import org.apache.impala.thrift.TDistributeParam;
import org.apache.impala.thrift.TKuduTable;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
@@ -42,76 +40,86 @@ import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.KuduUtil;
import org.apache.impala.util.TResultRowBuilder;
+import org.apache.impala.service.CatalogOpExecutor;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
+import org.apache.kudu.client.PartitionSchema.RangeSchema;
+import org.apache.kudu.client.PartitionSchema;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
/**
- * Impala representation of a Kudu table.
- *
- * The Kudu-related metadata is stored in the Metastore table's table properties.
+ * Representation of a Kudu table in the catalog cache.
*/
public class KuduTable extends Table {
- private static final Logger LOG = Logger.getLogger(Table.class);
+
+ private static final Logger LOG = Logger.getLogger(KuduTable.class);
// Alias to the string key that identifies the storage handler for Kudu tables.
public static final String KEY_STORAGE_HANDLER =
hive_metastoreConstants.META_TABLE_STORAGE;
- // Key to access the table name from the table properties
+ // Key to access the table name from the table properties.
public static final String KEY_TABLE_NAME = "kudu.table_name";
// Key to access the columns used to build the (composite) key of the table.
- // The order of the keys is important.
+ // Deprecated - Used only for error checking.
public static final String KEY_KEY_COLUMNS = "kudu.key_columns";
- // Key to access the master address from the table properties. Error handling for
+ // Key to access the master host from the table properties. Error handling for
// this string is done in the KuduClient library.
- // TODO we should have something like KuduConfig.getDefaultConfig()
- public static final String KEY_MASTER_ADDRESSES = "kudu.master_addresses";
+ // TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility
+ // with older versions.
+ public static final String KEY_MASTER_HOSTS = "kudu.master_addresses";
// Kudu specific value for the storage handler table property keyed by
// KEY_STORAGE_HANDLER.
+ // TODO: Fix the storage handler name (see IMPALA-4271).
public static final String KUDU_STORAGE_HANDLER =
"com.cloudera.kudu.hive.KuduStorageHandler";
// Key to specify the number of tablet replicas.
- // TODO(KUDU): Allow modification in alter table.
public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas";
public static final long KUDU_RPC_TIMEOUT_MS = 50000;
- // The name of the table in Kudu.
+ // Table name in the Kudu storage engine. It may not neccessarily be the same as the
+ // table name specified in the CREATE TABLE statement; the latter
+ // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may
+ // differ:
+ // 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::<db_name>' to
+ // avoid conficts. TODO: Remove this when Kudu supports databases.
+ // 2. The user may specify a table name using the 'kudu.table_name' table property.
private String kuduTableName_;
// Comma separated list of Kudu master hosts with optional ports.
private String kuduMasters_;
- // The set of columns that are key columns in Kudu.
- private ImmutableList<String> kuduKeyColumnNames_;
+ // Primary key column names.
+ private final List<String> primaryKeyColumnNames_ = Lists.newArrayList();
+
+ // Distribution schemes of this Kudu table. Both range and hash-based distributions are
+ // supported.
+ private final List<DistributeParam> distributeBy_ = Lists.newArrayList();
protected KuduTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable,
Db db, String name, String owner) {
super(id, msTable, db, name, owner);
- }
-
- public TKuduTable getKuduTable() {
- TKuduTable tbl = new TKuduTable();
- tbl.setKey_columns(Preconditions.checkNotNull(kuduKeyColumnNames_));
- tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
- tbl.setTable_name(kuduTableName_);
- return tbl;
- }
-
- @Override
- public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
- TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE,
- getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
- desc.setKuduTable(getKuduTable());
- return desc;
+ kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
}
@Override
@@ -126,78 +134,149 @@ public class KuduTable extends Table {
@Override
public ArrayList<Column> getColumnsInHiveOrder() { return getColumns(); }
- public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table mstbl) {
- return KUDU_STORAGE_HANDLER.equals(mstbl.getParameters().get(KEY_STORAGE_HANDLER));
+ public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+ return KUDU_STORAGE_HANDLER.equals(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
+ }
+
+ public String getKuduTableName() { return kuduTableName_; }
+ public String getKuduMasterHosts() { return kuduMasters_; }
+
+ public List<String> getPrimaryKeyColumnNames() {
+ return ImmutableList.copyOf(primaryKeyColumnNames_);
+ }
+
+ public List<DistributeParam> getDistributeBy() {
+ return ImmutableList.copyOf(distributeBy_);
}
/**
- * Load the columns from the schema list
+ * Loads the metadata of a Kudu table.
+ *
+ * Schema and distribution schemes are loaded directly from Kudu whereas column stats
+ * are loaded from HMS. The function also updates the table schema in HMS in order to
+ * propagate alterations made to the Kudu table to HMS.
*/
- private void loadColumns(List<FieldSchema> schema, IMetaStoreClient client,
- Set<String> keyColumns) throws TableLoadingException {
+ @Override
+ public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
+ org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ msTable_ = msTbl;
+ // This is set to 0 for Kudu tables.
+ // TODO: Change this to reflect the number of pk columns and modify all the
+ // places (e.g. insert stmt) that currently make use of this parameter.
+ numClusteringCols_ = 0;
+ kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ Preconditions.checkNotNull(kuduTableName_);
+ kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+ Preconditions.checkNotNull(kuduMasters_);
+ org.apache.kudu.client.KuduTable kuduTable = null;
+ numRows_ = getRowCount(msTable_.getParameters());
+
+ // Connect to Kudu to retrieve table metadata
+ try (KuduClient kuduClient = new KuduClient.KuduClientBuilder(
+ getKuduMasterHosts()).build()) {
+ kuduTable = kuduClient.openTable(kuduTableName_);
+ } catch (KuduException e) {
+ LOG.error("Error accessing Kudu table " + kuduTableName_);
+ throw new TableLoadingException(e.getMessage());
+ }
+ Preconditions.checkNotNull(kuduTable);
+
+ // Load metadata from Kudu and HMS
+ try {
+ loadSchema(kuduTable);
+ loadDistributeByParams(kuduTable);
+ loadAllColumnStats(msClient);
+ } catch (ImpalaRuntimeException e) {
+ LOG.error("Error loading metadata for Kudu table: " + kuduTableName_);
+ throw new TableLoadingException("Error loading metadata for Kudu table " +
+ kuduTableName_, e);
+ }
- if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) {
- throw new TableLoadingException(String.format("Kudu tables must have at least one"
- + "key column (had %d), and no more key columns than there are table columns "
- + "(had %d).", keyColumns.size(), schema.size()));
+ // Update the table schema in HMS.
+ try {
+ long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
+ msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+ msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
+ StatsSetupConst.TRUE);
+ msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
+ } catch (TException e) {
+ throw new TableLoadingException(e.getMessage());
}
+ }
+ /**
+ * Loads the schema from the Kudu table including column definitions and primary key
+ * columns. Replaces the columns in the HMS table with the columns from the Kudu table.
+ * Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to
+ * Impala data types.
+ */
+ private void loadSchema(org.apache.kudu.client.KuduTable kuduTable)
+ throws ImpalaRuntimeException {
+ Preconditions.checkNotNull(kuduTable);
clearColumns();
- Set<String> columnNames = Sets.newHashSet();
+ primaryKeyColumnNames_.clear();
+ List<FieldSchema> cols = msTable_.getSd().getCols();
+ cols.clear();
int pos = 0;
- for (FieldSchema field: schema) {
- org.apache.impala.catalog.Type type = parseColumnType(field);
- // TODO(kudu-merge): Check for decimal types?
- boolean isKey = keyColumns.contains(field.getName());
- KuduColumn col = new KuduColumn(field.getName(), isKey, !isKey, type,
- field.getComment(), pos);
- columnNames.add(col.getName());
- addColumn(col);
+ for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) {
+ Type type = KuduUtil.toImpalaType(colSchema.getType());
+ String colName = colSchema.getName();
+ cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null));
+ boolean isKey = colSchema.isKey();
+ if (isKey) primaryKeyColumnNames_.add(colName);
+ addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos));
++pos;
}
+ }
- if (!columnNames.containsAll(keyColumns)) {
- throw new TableLoadingException(String.format("Some key columns were not found in"
- + " the set of columns. List of column names: %s, List of key column names:"
- + " %s", Iterables.toString(columnNames), Iterables.toString(keyColumns)));
+ private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) {
+ Preconditions.checkNotNull(kuduTable);
+ PartitionSchema partitionSchema = kuduTable.getPartitionSchema();
+ Preconditions.checkState(!colsByPos_.isEmpty());
+ distributeBy_.clear();
+ for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) {
+ List<String> columnNames = Lists.newArrayList();
+ for (int colPos: hashBucketSchema.getColumnIds()) {
+ columnNames.add(colsByPos_.get(colPos).getName());
+ }
+ distributeBy_.add(
+ DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets()));
}
-
- kuduKeyColumnNames_ = ImmutableList.copyOf(keyColumns);
-
- loadAllColumnStats(client);
+ RangeSchema rangeSchema = partitionSchema.getRangeSchema();
+ List<Integer> columnIds = rangeSchema.getColumns();
+ if (columnIds.isEmpty()) return;
+ List<String> columnNames = Lists.newArrayList();
+ for (int colPos: columnIds) columnNames.add(colsByPos_.get(colPos).getName());
+ // We don't populate the split values because Kudu's API doesn't currently support
+ // retrieving the split values for range partitions.
+ // TODO: File a Kudu JIRA.
+ distributeBy_.add(DistributeParam.createRangeParam(columnNames, null));
}
- @Override
- public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
- // TODO handle 'reuseMetadata'
- if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) {
- throw new TableLoadingException(String.format(
- "Cannot load Kudu table %s, table is corrupt.", name_));
+ /**
+ * Creates a temporary KuduTable object populated with the specified properties but has
+ * an invalid TableId and is not added to the Kudu storage engine or the
+ * HMS. This is used for CTAS statements.
+ */
+ public static KuduTable createCtasTarget(Db db,
+ org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
+ List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) {
+ KuduTable tmpTable = new KuduTable(TableId.createInvalidId(), msTbl, db,
+ msTbl.getTableName(), msTbl.getOwner());
+ int pos = 0;
+ for (ColumnDef colDef: columnDefs) {
+ tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
}
-
- msTable_ = msTbl;
- kuduTableName_ = msTbl.getParameters().get(KEY_TABLE_NAME);
- kuduMasters_ = msTbl.getParameters().get(KEY_MASTER_ADDRESSES);
-
- String keyColumnsProp = Preconditions.checkNotNull(msTbl.getParameters()
- .get(KEY_KEY_COLUMNS).toLowerCase(), "'kudu.key_columns' cannot be null.");
- Set<String> keyColumns = KuduUtil.parseKeyColumns(keyColumnsProp);
-
- // Load the rest of the data from the table parameters directly
- loadColumns(msTbl.getSd().getCols(), client, keyColumns);
-
- numClusteringCols_ = 0;
-
- // Get row count from stats
- numRows_ = getRowCount(getMetaStoreTable().getParameters());
+ tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames);
+ tmpTable.distributeBy_.addAll(distributeParams);
+ return tmpTable;
}
@Override
public TTable toThrift() {
TTable table = super.toThrift();
table.setTable_type(TTableType.KUDU_TABLE);
- table.setKudu_table(getKuduTable());
+ table.setKudu_table(getTKuduTable());
return table;
}
@@ -207,33 +286,46 @@ public class KuduTable extends Table {
TKuduTable tkudu = thriftTable.getKudu_table();
kuduTableName_ = tkudu.getTable_name();
kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
- kuduKeyColumnNames_ = ImmutableList.copyOf(tkudu.getKey_columns());
+ primaryKeyColumnNames_.clear();
+ primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
+ loadDistributeByParamsFromThrift(tkudu.getDistribute_by());
}
- public String getKuduTableName() { return kuduTableName_; }
- public String getKuduMasterAddresses() { return kuduMasters_; }
- public int getNumKeyColumns() { return kuduKeyColumnNames_.size(); }
-
- /**
- * Returns true if all required parameters are present in the given table properties
- * map.
- * TODO(kudu-merge) Return a more specific error string.
- */
- public static boolean tableParamsAreValid(Map<String, String> params) {
- return params.get(KEY_TABLE_NAME) != null && params.get(KEY_TABLE_NAME).length() > 0
- && params.get(KEY_MASTER_ADDRESSES) != null
- && params.get(KEY_MASTER_ADDRESSES).length() > 0
- && params.get(KEY_KEY_COLUMNS) != null
- && params.get(KEY_KEY_COLUMNS).length() > 0;
- }
+ private void loadDistributeByParamsFromThrift(List<TDistributeParam> params) {
+ distributeBy_.clear();
+ for (TDistributeParam param: params) {
+ if (param.isSetBy_hash_param()) {
+ TDistributeByHashParam hashParam = param.getBy_hash_param();
+ distributeBy_.add(DistributeParam.createHashParam(
+ hashParam.getColumns(), hashParam.getNum_buckets()));
+ } else {
+ Preconditions.checkState(param.isSetBy_range_param());
+ TDistributeByRangeParam rangeParam = param.getBy_range_param();
+ distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(),
+ null));
+ }
+ }
+ }
- /**
- * The number of nodes is not know ahead of time and will be updated during computeStats
- * in the scan node.
- */
- public int getNumNodes() { return -1; }
+ @Override
+ public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
+ TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE,
+ getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
+ desc.setKuduTable(getTKuduTable());
+ return desc;
+ }
- public List<String> getKuduKeyColumnNames() { return kuduKeyColumnNames_; }
+ private TKuduTable getTKuduTable() {
+ TKuduTable tbl = new TKuduTable();
+ tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
+ tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
+ tbl.setTable_name(kuduTableName_);
+ Preconditions.checkNotNull(distributeBy_);
+ for (DistributeParam distributeParam: distributeBy_) {
+ tbl.addToDistribute_by(distributeParam.toThrift());
+ }
+ return tbl;
+ }
public TResultSet getTableStats() throws ImpalaRuntimeException {
TResultSet result = new TResultSet();
@@ -247,7 +339,7 @@ public class KuduTable extends Table {
resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift()));
try (KuduClient client = new KuduClient.KuduClientBuilder(
- getKuduMasterAddresses()).build()) {
+ getKuduMasterHosts()).build()) {
org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_);
List<LocatedTablet> tablets =
kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 6145cc5..4b40b44 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -455,6 +455,11 @@ public abstract class Table implements CatalogObject {
@Override
public boolean isLoaded() { return true; }
+ public static boolean isExternalTable(
+ org.apache.hadoop.hive.metastore.api.Table msTbl) {
+ return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+ }
+
/**
* If the table is cached, it returns a <cache pool name, replication factor> pair
* and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 764abe0..8541a3a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -18,7 +18,6 @@
package org.apache.impala.catalog;
import java.util.EnumSet;
-import java.util.Set;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Type.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java b/fe/src/main/java/org/apache/impala/catalog/Type.java
index 91fc2e3..05c71c7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Type.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Type.java
@@ -317,6 +317,15 @@ public abstract class Type {
}
/**
+ * Checks if types t1 and t2 are assignment compatible, i.e. if both t1 and t2 can be
+ * assigned to a type t without an explicit cast and without any conversions that would
+ * result in loss of precision.
+ */
+ public static boolean areAssignmentCompatibleTypes(Type t1, Type t2) {
+ return getAssignmentCompatibleType(t1, t2, true) != ScalarType.INVALID;
+ }
+
+ /**
* Returns true if this type exceeds the MAX_NESTING_DEPTH, false otherwise.
*/
public boolean exceedsMaxNestingDepth() { return exceedsMaxNestingDepth(0); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
deleted file mode 100644
index 6c3ba8e..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import java.util.List;
-
-import org.apache.impala.thrift.TDistributeParam;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.thrift.TAlterTableParams;
-
-/**
- * Abstract class to implement the storage specific portion of DDL requests.
- *
- * During catalog DDL operations the CatalogOpExecutor will instantiate the correct
- * subclass of this class to handle the DDL operation to the storage backend. See,
- * CatalogOpExecutor::createDDLDelegate() for details.
- *
- */
-public abstract class DdlDelegate {
-
- protected Table msTbl_;
- protected TAlterTableParams tAlterTableParams_;
- protected List<TDistributeParam> distributeParams_;
-
- /**
- * Creates a new delegate to modify Table 'msTbl'.
- */
- public DdlDelegate setMsTbl(Table msTbl) {
- msTbl_ = msTbl;
- return this;
- }
-
- public DdlDelegate setAlterTableParams(TAlterTableParams p) {
- tAlterTableParams_ = p;
- return this;
- }
-
- public DdlDelegate setDistributeParams(List<TDistributeParam> p) {
- distributeParams_ = p;
- return this;
- }
-
- /**
- * Creates the table.
- */
- public abstract void createTable() throws ImpalaRuntimeException;
-
- /**
- * Drops the table.
- */
- public abstract void dropTable() throws ImpalaRuntimeException;
-
- /**
- * Performs an alter table with the parameters set with setAlterTableParams().
- */
- public abstract boolean alterTable() throws ImpalaRuntimeException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
deleted file mode 100644
index 8410868..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
+++ /dev/null
@@ -1,190 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import static org.apache.impala.util.KuduUtil.compareSchema;
-import static org.apache.impala.util.KuduUtil.fromImpalaType;
-import static org.apache.impala.util.KuduUtil.parseKeyColumns;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.PartialRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.impala.catalog.KuduTable;
-import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.thrift.TDistributeParam;
-import org.apache.impala.util.KuduUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-
-/**
- * Implementation of the Kudu DDL Delegate. Propagates create and drop table statements to
- * Kudu.
- */
-public class KuduDdlDelegate extends DdlDelegate {
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduDdlDelegate.class);
-
- public KuduDdlDelegate(Table msTbl) {
- setMsTbl(msTbl);
- }
-
- /**
- * Creates the Kudu table if it does not exist and returns true. If the table exists and
- * the table is not a managed table ignore and return false, otherwise throw an
- * exception.
- */
- @Override
- public void createTable()
- throws ImpalaRuntimeException {
-
- String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME);
- String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES);
-
- // Can be optional for un-managed tables
- String kuduKeyCols = msTbl_.getParameters().get(KuduTable.KEY_KEY_COLUMNS);
-
- String replication = msTbl_.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
-
- try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) {
- // TODO should we throw if the table does not exist when its an external table?
- if (client.tableExists(kuduTableName)) {
- if (msTbl_.getTableType().equals(TableType.MANAGED_TABLE.toString())) {
- throw new ImpalaRuntimeException(String.format(
- "Table %s already exists in Kudu master %s.", kuduTableName, kuduMasters));
- }
-
- // Check if the external table matches the schema
- org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName);
- if (!compareSchema(msTbl_, kuduTable)) {
- throw new ImpalaRuntimeException(String.format(
- "Table %s (%s) has a different schema in Kudu than in Hive.",
- msTbl_.getTableName(), kuduTableName));
- }
- return;
- }
-
- HashSet<String> keyColNames = parseKeyColumns(kuduKeyCols);
- List<ColumnSchema> keyColSchemas = new ArrayList<>();
-
- // Create a new Schema and map the types accordingly
- ArrayList<ColumnSchema> columns = Lists.newArrayList();
- for (FieldSchema fieldSchema: msTbl_.getSd().getCols()) {
- org.apache.impala.catalog.Type catalogType = org.apache.impala.catalog.Type
- .parseColumnType(fieldSchema.getType());
- if (catalogType == null) {
- throw new ImpalaRuntimeException(String.format(
- "Could not parse column type %s.", fieldSchema.getType()));
- }
- Type t = fromImpalaType(catalogType);
- // Create the actual column and check if the column is a key column
- ColumnSchemaBuilder csb = new ColumnSchemaBuilder(
- fieldSchema.getName(), t);
- boolean isKeyColumn = keyColNames.contains(fieldSchema.getName());
- csb.key(isKeyColumn);
- csb.nullable(!isKeyColumn);
- ColumnSchema cs = csb.build();
- columns.add(cs);
- if (isKeyColumn) keyColSchemas.add(cs);
- }
-
- Schema schema = new Schema(columns);
- CreateTableOptions cto = new CreateTableOptions();
-
- // Handle auto-partitioning of the Kudu table
- if (distributeParams_ != null) {
- for (TDistributeParam param : distributeParams_) {
- if (param.isSetBy_hash_param()) {
- Preconditions.checkState(!param.isSetBy_range_param());
- cto.addHashPartitions(param.getBy_hash_param().getColumns(),
- param.getBy_hash_param().getNum_buckets());
- } else {
- Preconditions.checkState(param.isSetBy_range_param());
- cto.setRangePartitionColumns(param.getBy_range_param().getColumns());
- for (PartialRow p : KuduUtil.parseSplits(schema, param.getBy_range_param())) {
- cto.addSplitRow(p);
- }
- }
- }
- }
-
- if (!Strings.isNullOrEmpty(replication)) {
- int r = Integer.parseInt(replication);
- if (r <= 0) {
- throw new ImpalaRuntimeException(
- "Number of tablet replicas must be greater than zero. " +
- "Given number of replicas is: " + Integer.toString(r));
- }
- cto.setNumReplicas(r);
- }
-
- client.createTable(kuduTableName, schema, cto);
- } catch (ImpalaRuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new ImpalaRuntimeException("Error creating Kudu table", e);
- }
- }
-
- @Override
- public void dropTable() throws ImpalaRuntimeException {
- // If table is an external table, do not delete the data
- if (msTbl_.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) return;
-
- String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME);
- String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES);
-
- try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) {
- if (!client.tableExists(kuduTableName)) {
- LOG.warn("Table: %s is in inconsistent state. It does not exist in Kudu master(s)"
- + " %s, but it exists in Hive metastore. Deleting from metastore only.",
- kuduTableName, kuduMasters);
- return;
- }
- client.deleteTable(kuduTableName);
- return;
- } catch (Exception e) {
- throw new ImpalaRuntimeException("Error dropping Kudu table", e);
- }
- }
-
- public static boolean canHandle(org.apache.hadoop.hive.metastore.api.Table msTbl) {
- return KuduTable.isKuduTable(msTbl);
- }
-
- @Override
- public boolean alterTable() throws ImpalaRuntimeException {
- throw new ImpalaRuntimeException(
- "Alter table operations are not supported for Kudu tables.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
deleted file mode 100644
index 8aabaa4..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import org.apache.impala.common.ImpalaRuntimeException;
-
-/**
- * Empty implementation for the DdlDelegate interface that does nothing.
- */
-public class UnsupportedOpDelegate extends DdlDelegate {
-
- @Override
- public void createTable() throws ImpalaRuntimeException { }
-
- @Override
- public void dropTable() throws ImpalaRuntimeException { }
-
- @Override
- public boolean alterTable() throws ImpalaRuntimeException { return true; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
index 3345c1b..8d15425 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
@@ -52,7 +52,7 @@ public class HdfsPartitionFilter {
// lhs exprs of smap used in isMatch()
private final ArrayList<SlotRef> lhsSlotRefs_ = Lists.newArrayList();
- // indices into Table.getColumns()
+ // indices into Table.getColumnNames()
private final ArrayList<Integer> refdKeys_ = Lists.newArrayList();
public HdfsPartitionFilter(Expr predicate, HdfsTable tbl, Analyzer analyzer) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 64ef822..9434801 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -107,7 +107,7 @@ public class KuduScanNode extends ScanNode {
conjuncts_ = orderConjunctsByCost(conjuncts_);
try (KuduClient client =
- new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) {
+ new KuduClientBuilder(kuduTable_.getKuduMasterHosts()).build()) {
org.apache.kudu.client.KuduTable rpcTable =
client.openTable(kuduTable_.getKuduTableName());
validateSchema(rpcTable);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5743a59..54493d1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -51,6 +51,11 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.TableName;
@@ -61,7 +66,6 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnNotFoundException;
import org.apache.impala.catalog.DataSource;
-import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HBaseTable;
@@ -70,6 +74,7 @@ import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.HiveStorageDescriptorFactory;
import org.apache.impala.catalog.IncompleteTable;
+import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.PartitionNotFoundException;
import org.apache.impala.catalog.PartitionStatsUtil;
@@ -82,9 +87,6 @@ import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
-import org.apache.impala.catalog.delegates.DdlDelegate;
-import org.apache.impala.catalog.delegates.KuduDdlDelegate;
-import org.apache.impala.catalog.delegates.UnsupportedOpDelegate;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
@@ -121,7 +123,6 @@ import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TDdlExecResponse;
-import org.apache.impala.thrift.TDistributeParam;
import org.apache.impala.thrift.TDropDataSourceParams;
import org.apache.impala.thrift.TDropDbParams;
import org.apache.impala.thrift.TDropFunctionParams;
@@ -149,11 +150,6 @@ import org.apache.impala.thrift.TTruncateParams;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.util.HdfsCachingUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* Class used to execute Catalog Operations, including DDL and refresh/invalidate
@@ -1103,8 +1099,7 @@ public class CatalogOpExecutor {
/**
* Drops a database from the metastore and removes the database's metadata from the
- * internal cache. Re-throws any Hive Meta Store exceptions encountered during
- * the drop.
+ * internal cache. Re-throws any HMS exceptions encountered during the drop.
*/
private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
throws ImpalaException {
@@ -1120,6 +1115,9 @@ public class CatalogOpExecutor {
TCatalogObject removedObject = new TCatalogObject();
synchronized (metastoreDdlLock_) {
+ // Remove all the Kudu tables of 'db' from the Kudu storage engine.
+ if (db != null && params.cascade) dropTablesFromKudu(db);
+
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().dropDatabase(
params.getDb(), true, params.if_exists, params.cascade);
@@ -1144,6 +1142,44 @@ public class CatalogOpExecutor {
}
/**
+ * Drops all the Kudu tables of database 'db' from the Kudu storage engine. Retrieves
+ * the Kudu table name of each table in 'db' from HMS. Throws an ImpalaException if
+ * metadata for Kudu tables cannot be loaded from HMS or if an error occurs while
+ * trying to drop a table from Kudu.
+ */
+ private void dropTablesFromKudu(Db db) throws ImpalaException {
+ // If the table format isn't available, because the table hasn't been loaded yet,
+ // the metadata must be fetched from the Hive Metastore.
+ List<String> incompleteTableNames = Lists.newArrayList();
+ List<org.apache.hadoop.hive.metastore.api.Table> msTables = Lists.newArrayList();
+ for (Table table: db.getTables()) {
+ org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
+ if (msTable == null) {
+ incompleteTableNames.add(table.getName());
+ } else {
+ msTables.add(msTable);
+ }
+ }
+ if (!incompleteTableNames.isEmpty()) {
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ msTables.addAll(msClient.getHiveClient().getTableObjectsByName(
+ db.getName(), incompleteTableNames));
+ } catch (TException e) {
+ LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTableObjectsByName") +
+ e.getMessage());
+ }
+ }
+ for (org.apache.hadoop.hive.metastore.api.Table msTable: msTables) {
+ if (!KuduTable.isKuduTable(msTable) || Table.isExternalTable(msTable)) continue;
+ // The operation will be aborted if the Kudu table cannot be dropped. If for
+ // some reason Kudu is permanently stuck in a non-functional state, the user is
+ // expected to ALTER TABLE to either set the table to UNMANAGED or set the format
+ // to something else.
+ KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true);
+ }
+ }
+
+ /**
* Drops a table or view from the metastore and removes it from the catalog.
* Also drops all associated caching requests on the table and/or table's partitions,
* uncaching all table data. If params.purge is true, table data is permanently
@@ -1157,17 +1193,6 @@ public class CatalogOpExecutor {
TCatalogObject removedObject = new TCatalogObject();
synchronized (metastoreDdlLock_) {
-
- // Forward the DDL operation to the specified storage backend.
- try {
- org.apache.hadoop.hive.metastore.api.Table msTbl = getExistingTable(
- tableName.getDb(), tableName.getTbl()).getMetaStoreTable();
- DdlDelegate handler = createDdlDelegate(msTbl);
- handler.dropTable();
- } catch (TableNotFoundException | DatabaseNotFoundException e) {
- // Do nothing
- }
-
Db db = catalog_.getDb(params.getTable_name().db_name);
if (db == null) {
if (params.if_exists) return;
@@ -1179,6 +1204,23 @@ public class CatalogOpExecutor {
if (params.if_exists) return;
throw new CatalogException("Table/View does not exist: " + tableName);
}
+
+ // Retrieve the HMS table to determine if this is a Kudu table.
+ org.apache.hadoop.hive.metastore.api.Table msTbl = existingTbl.getMetaStoreTable();
+ if (msTbl == null) {
+ Preconditions.checkState(existingTbl instanceof IncompleteTable);
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ msTbl = msClient.getHiveClient().getTable(tableName.getDb(),
+ tableName.getTbl());
+ } catch (TException e) {
+ LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
+ }
+ }
+ if (msTbl != null && KuduTable.isKuduTable(msTbl)
+ && !Table.isExternalTable(msTbl)) {
+ KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
+ }
+
// Check to make sure we don't drop a view with "drop table" statement and
// vice versa. is_table field is marked optional in TDropTableOrViewParams to
// maintain catalog api compatibility.
@@ -1343,7 +1385,8 @@ public class CatalogOpExecutor {
/**
* Creates a new table in the metastore and adds an entry to the metadata cache to
- * lazily load the new metadata on the next access. Re-throws any Hive Meta Store
+ * lazily load the new metadata on the next access. If this is a managed Kudu table,
+ * the table is also created in the Kudu storage engine. Re-throws any HMS or Kudu
* exceptions encountered during the create.
*/
private boolean createTable(TCreateTableParams params, TDdlExecResponse response)
@@ -1351,9 +1394,8 @@ public class CatalogOpExecutor {
Preconditions.checkNotNull(params);
TableName tableName = TableName.fromThrift(params.getTable_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
- Preconditions.checkState(params.getColumns() != null &&
- params.getColumns().size() > 0,
- "Null or empty column list given as argument to Catalog.createTable");
+ Preconditions.checkState(params.getColumns() != null,
+ "Null column list given as argument to Catalog.createTable");
if (params.if_not_exists &&
catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
@@ -1362,11 +1404,161 @@ public class CatalogOpExecutor {
response.getResult().setVersion(catalog_.getCatalogVersion());
return false;
}
- org.apache.hadoop.hive.metastore.api.Table tbl =
- createMetaStoreTable(params);
+ org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
LOG.debug(String.format("Creating table %s", tableName));
- return createTable(tbl, params.if_not_exists, params.getCache_op(),
- params.getDistribute_by(), response);
+ if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response);
+ Preconditions.checkState(params.getColumns().size() > 0,
+ "Empty column list given as argument to Catalog.createTable");
+ return createTable(tbl, params.if_not_exists, params.getCache_op(), response);
+ }
+
+ /**
+ * Utility function that creates a hive.metastore.api.Table object based on the given
+ * TCreateTableParams.
+ * TODO: Extract metastore object creation utility functions into a separate
+ * helper/factory class.
+ */
+ public static org.apache.hadoop.hive.metastore.api.Table createMetaStoreTable(
+ TCreateTableParams params) {
+ Preconditions.checkNotNull(params);
+ TableName tableName = TableName.fromThrift(params.getTable_name());
+ org.apache.hadoop.hive.metastore.api.Table tbl =
+ new org.apache.hadoop.hive.metastore.api.Table();
+ tbl.setDbName(tableName.getDb());
+ tbl.setTableName(tableName.getTbl());
+ tbl.setOwner(params.getOwner());
+ if (params.isSetTable_properties()) {
+ tbl.setParameters(params.getTable_properties());
+ } else {
+ tbl.setParameters(new HashMap<String, String>());
+ }
+
+ if (params.getComment() != null) {
+ tbl.getParameters().put("comment", params.getComment());
+ }
+ if (params.is_external) {
+ tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+ tbl.putToParameters("EXTERNAL", "TRUE");
+ } else {
+ tbl.setTableType(TableType.MANAGED_TABLE.toString());
+ }
+
+ tbl.setSd(createSd(params));
+ if (params.getPartition_columns() != null) {
+ // Add in any partition keys that were specified
+ tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns()));
+ } else {
+ tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+ }
+ return tbl;
+ }
+
+ private static StorageDescriptor createSd(TCreateTableParams params) {
+ StorageDescriptor sd = HiveStorageDescriptorFactory.createSd(
+ params.getFile_format(), RowFormat.fromThrift(params.getRow_format()));
+ if (params.isSetSerde_properties()) {
+ if (sd.getSerdeInfo().getParameters() == null) {
+ sd.getSerdeInfo().setParameters(params.getSerde_properties());
+ } else {
+ sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties());
+ }
+ }
+
+ if (params.getLocation() != null) sd.setLocation(params.getLocation());
+
+ // Add in all the columns
+ sd.setCols(buildFieldSchemaList(params.getColumns()));
+ return sd;
+ }
+
+ /**
+ * Creates a new Kudu table. The Kudu table is first created in the Kudu storage engine
+ * (only applicable to managed tables), then in HMS and finally in the catalog cache.
+ * Failure to add the table in HMS results in the table being dropped from Kudu.
+ * 'response' is populated with the results of this operation. Returns true if a new
+ * table was created as part of this call, false otherwise.
+ */
+ private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
+ TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
+ Preconditions.checkState(KuduTable.isKuduTable(newTable));
+ if (Table.isExternalTable(newTable)) {
+ KuduCatalogOpExecutor.populateColumnsFromKudu(newTable);
+ } else {
+ KuduCatalogOpExecutor.createManagedTable(newTable, params);
+ }
+ try {
+ // Add the table to the HMS and the catalog cache. Aquire metastoreDdlLock_ to
+ // ensure the atomicity of these operations.
+ synchronized (metastoreDdlLock_) {
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ msClient.getHiveClient().createTable(newTable);
+ }
+ // Add the table to the catalog cache
+ Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
+ addTableToCatalogUpdate(newTbl, response.result);
+ }
+ } catch (Exception e) {
+ try {
+ // Error creating the table in HMS, drop the managed table from Kudu.
+ if (!Table.isExternalTable(newTable)) {
+ KuduCatalogOpExecutor.dropTable(newTable, false);
+ }
+ } catch (Exception logged) {
+ String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ LOG.error(String.format("Failed to drop Kudu table '%s'", kuduTableName),
+ logged);
+ throw new RuntimeException(String.format("Failed to create the table '%s' in " +
+ " the Metastore and the newly created Kudu table '%s' could not be " +
+ " dropped. The log contains more information.", newTable.getTableName(),
+ kuduTableName), e);
+ }
+ if (e instanceof AlreadyExistsException && params.if_not_exists) return false;
+ throw new ImpalaRuntimeException(
+ String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
+ }
+ return true;
+ }
+
+ /**
+ * Creates a new table. The table is initially created in HMS and, if that operation
+ * succeeds, it is then added in the catalog cache. It also sets HDFS caching if
+ * 'cacheOp' is not null. 'response' is populated with the results of this operation.
+ * Returns true if a new table was created as part of this call, false otherwise.
+ */
+ private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
+ boolean if_not_exists, THdfsCachingOp cacheOp, TDdlExecResponse response)
+ throws ImpalaException {
+ Preconditions.checkState(!KuduTable.isKuduTable(newTable));
+ synchronized (metastoreDdlLock_) {
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ msClient.getHiveClient().createTable(newTable);
+ // If this table should be cached, and the table location was not specified by
+ // the user, an extra step is needed to read the table to find the location.
+ if (cacheOp != null && cacheOp.isSet_cached() &&
+ newTable.getSd().getLocation() == null) {
+ newTable = msClient.getHiveClient().getTable(
+ newTable.getDbName(), newTable.getTableName());
+ }
+ } catch (Exception e) {
+ if (e instanceof AlreadyExistsException && if_not_exists) return false;
+ throw new ImpalaRuntimeException(
+ String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
+ }
+
+ // Submit the cache request and update the table metadata.
+ if (cacheOp != null && cacheOp.isSet_cached()) {
+ short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
+ JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
+ long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
+ cacheOp.getCache_pool_name(), replication);
+ catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
+ new TTableName(newTable.getDbName(), newTable.getTableName()));
+ applyAlterTable(newTable);
+ }
+ Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
+ addTableToCatalogUpdate(newTbl, response.result);
+ }
+ return true;
}
/**
@@ -1392,7 +1584,7 @@ public class CatalogOpExecutor {
new org.apache.hadoop.hive.metastore.api.Table();
setViewAttributes(params, view);
LOG.debug(String.format("Creating view %s", tableName));
- createTable(view, params.if_not_exists, null, null, response);
+ createTable(view, params.if_not_exists, null, response);
}
/**
@@ -1423,6 +1615,8 @@ public class CatalogOpExecutor {
Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
org.apache.hadoop.hive.metastore.api.Table tbl =
srcTable.getMetaStoreTable().deepCopy();
+ Preconditions.checkState(!KuduTable.isKuduTable(tbl),
+ "CREATE TABLE LIKE is not supported for Kudu tables.");
tbl.setDbName(tblName.getDb());
tbl.setTableName(tblName.getTbl());
tbl.setOwner(params.getOwner());
@@ -1460,7 +1654,7 @@ public class CatalogOpExecutor {
tbl.getSd().setLocation(params.getLocation());
if (fileFormat != null) {
setStorageDescriptorFileFormat(tbl.getSd(), fileFormat);
- } else if (fileFormat == null && srcTable instanceof View) {
+ } else if (srcTable instanceof View) {
// Here, source table is a view which has no input format. So to be
// consistent with CREATE TABLE, default input format is assumed to be
// TEXT unless otherwise specified.
@@ -1469,85 +1663,7 @@ public class CatalogOpExecutor {
// Set the row count of this table to unknown.
tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
LOG.debug(String.format("Creating table %s LIKE %s", tblName, srcTblName));
- createTable(tbl, params.if_not_exists, null, null, response);
- }
-
- /**
- * Creates a new table in the HMS. If ifNotExists=true, no error will be thrown if
- * the table already exists, otherwise an exception will be thrown.
- * Accepts an optional 'cacheOp' param, which if specified will cache the table's
- * HDFS location according to the 'cacheOp' spec after creation.
- * Stores details of the operations (such as the resulting catalog version) in
- * 'response' output parameter.
- * Returns true if a new table was created as part of this call, false otherwise.
- */
- private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
- boolean ifNotExists, THdfsCachingOp cacheOp, List<TDistributeParam> distribute_by,
- TDdlExecResponse response)
- throws ImpalaException {
- synchronized (metastoreDdlLock_) {
-
- try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
- msClient.getHiveClient().createTable(newTable);
- // If this table should be cached, and the table location was not specified by
- // the user, an extra step is needed to read the table to find the location.
- if (cacheOp != null && cacheOp.isSet_cached() &&
- newTable.getSd().getLocation() == null) {
- newTable = msClient.getHiveClient().getTable(newTable.getDbName(),
- newTable.getTableName());
- }
- } catch (AlreadyExistsException e) {
- if (!ifNotExists) {
- throw new ImpalaRuntimeException(
- String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
- }
- LOG.debug(String.format("Ignoring '%s' when creating table %s.%s because " +
- "IF NOT EXISTS was specified.", e,
- newTable.getDbName(), newTable.getTableName()));
- return false;
- } catch (TException e) {
- throw new ImpalaRuntimeException(
- String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
- }
-
- // Forward the operation to a specific storage backend. If the operation fails,
- // delete the just created hive table to avoid inconsistencies.
- try {
- createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable();
- } catch (ImpalaRuntimeException e) {
- try (MetaStoreClient c = catalog_.getMetaStoreClient()) {
- c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(),
- false, ifNotExists);
- } catch (Exception hE) {
- throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
- "dropTable"), hE);
- }
- throw e;
- }
-
- // Submit the cache request and update the table metadata.
- if (cacheOp != null && cacheOp.isSet_cached()) {
- short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
- JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
- long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
- cacheOp.getCache_pool_name(), replication);
- catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
- new TTableName(newTable.getDbName(), newTable.getTableName()));
- applyAlterTable(newTable);
- }
- Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
- addTableToCatalogUpdate(newTbl, response.result);
- }
- return true;
- }
-
- /**
- * Instantiate the appropriate DDL delegate for the table. If no known delegate is
- * available for the table, returns a UnsupportedOpDelegate instance.
- */
- private DdlDelegate createDdlDelegate(org.apache.hadoop.hive.metastore.api.Table tab) {
- if (KuduDdlDelegate.canHandle(tab)) return new KuduDdlDelegate(tab);
- return new UnsupportedOpDelegate();
+ createTable(tbl, params.if_not_exists, null, response);
}
/**
@@ -1967,6 +2083,9 @@ public class CatalogOpExecutor {
switch (params.getTarget()) {
case TBL_PROPERTY:
msTbl.getParameters().putAll(properties);
+ if (KuduTable.isKuduTable(msTbl)) {
+ KuduCatalogOpExecutor.validateKuduTblExists(msTbl);
+ }
break;
case SERDE_PROPERTY:
msTbl.getSd().getSerdeInfo().getParameters().putAll(properties);
@@ -2120,7 +2239,6 @@ public class CatalogOpExecutor {
Preconditions.checkNotNull(cacheOp);
Preconditions.checkNotNull(params.getPartition_spec());
// Alter partition params.
- final String RUNTIME_FILTER_FORMAT = "apply %s on %s";
TableName tableName = tbl.getTableName();
HdfsPartition partition = catalog_.getHdfsPartition(
tableName.getDb(), tableName.getTbl(), params.getPartition_spec());
@@ -2535,16 +2653,6 @@ public class CatalogOpExecutor {
}
/**
- * Returns a deep copy of the metastore.api.Table object for the given TableName.
- */
- private org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable(
- TableName tableName) throws CatalogException {
- Preconditions.checkState(tableName != null && tableName.isFullyQualified());
- return getExistingTable(tableName.getDb(), tableName.getTbl())
- .getMetaStoreTable().deepCopy();
- }
-
- /**
* Returns the metastore.api.Table object from the Hive Metastore for an existing
* fully loaded table.
*/
@@ -2608,7 +2716,7 @@ public class CatalogOpExecutor {
/**
* Calculates the next transient_lastDdlTime value.
*/
- private static long calculateDdlTime(
+ public static long calculateDdlTime(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
long existingLastDdlTime = CatalogServiceCatalog.getLastDdlTime(msTbl);
long currentTime = System.currentTimeMillis() / 1000;
@@ -2617,63 +2725,6 @@ public class CatalogOpExecutor {
}
/**
- * Utility function that creates a hive.metastore.api.Table object based on the given
- * TCreateTableParams.
- * TODO: Extract metastore object creation utility functions into a separate
- * helper/factory class.
- */
- public static org.apache.hadoop.hive.metastore.api.Table
- createMetaStoreTable(TCreateTableParams params) {
- Preconditions.checkNotNull(params);
- TableName tableName = TableName.fromThrift(params.getTable_name());
- org.apache.hadoop.hive.metastore.api.Table tbl =
- new org.apache.hadoop.hive.metastore.api.Table();
- tbl.setDbName(tableName.getDb());
- tbl.setTableName(tableName.getTbl());
- tbl.setOwner(params.getOwner());
- if (params.isSetTable_properties()) {
- tbl.setParameters(params.getTable_properties());
- } else {
- tbl.setParameters(new HashMap<String, String>());
- }
-
- if (params.getComment() != null) {
- tbl.getParameters().put("comment", params.getComment());
- }
- if (params.is_external) {
- tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
- tbl.putToParameters("EXTERNAL", "TRUE");
- } else {
- tbl.setTableType(TableType.MANAGED_TABLE.toString());
- }
-
- StorageDescriptor sd = HiveStorageDescriptorFactory.createSd(
- params.getFile_format(), RowFormat.fromThrift(params.getRow_format()));
-
- if (params.isSetSerde_properties()) {
- if (sd.getSerdeInfo().getParameters() == null) {
- sd.getSerdeInfo().setParameters(params.getSerde_properties());
- } else {
- sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties());
- }
- }
-
- if (params.getLocation() != null) {
- sd.setLocation(params.getLocation());
- }
- // Add in all the columns
- sd.setCols(buildFieldSchemaList(params.getColumns()));
- tbl.setSd(sd);
- if (params.getPartition_columns() != null) {
- // Add in any partition keys that were specified
- tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns()));
- } else {
- tbl.setPartitionKeys(new ArrayList<FieldSchema>());
- }
- return tbl;
- }
-
- /**
* Executes a TResetMetadataRequest and returns the result as a
* TResetMetadataResponse. Based on the request parameters, this operation
* may do one of three things:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 00a3d93..6d535fd 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -170,9 +170,11 @@ public class Frontend {
private final AtomicReference<AuthorizationChecker> authzChecker_;
private final ScheduledExecutorService policyReader_ =
Executors.newScheduledThreadPool(1);
+ private final String defaultKuduMasterHosts_;
- public Frontend(AuthorizationConfig authorizationConfig) {
- this(authorizationConfig, new ImpaladCatalog());
+ public Frontend(AuthorizationConfig authorizationConfig,
+ String defaultKuduMasterHosts) {
+ this(authorizationConfig, new ImpaladCatalog(defaultKuduMasterHosts));
}
/**
@@ -181,6 +183,7 @@ public class Frontend {
public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) {
authzConfig_ = authorizationConfig;
impaladCatalog_ = catalog;
+ defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts();
authzChecker_ = new AtomicReference<AuthorizationChecker>(
new AuthorizationChecker(authzConfig_, impaladCatalog_.getAuthPolicy()));
// If authorization is enabled, reload the policy on a regular basis.
@@ -226,7 +229,7 @@ public class Frontend {
// If this is not a delta, this update should replace the current
// Catalog contents so create a new catalog and populate it.
- if (!req.is_delta) catalog = new ImpaladCatalog();
+ if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index fc8deaf..7d0af54 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -102,7 +102,7 @@ public class JniCatalog {
try {
catalog_.reset();
} catch (CatalogException e) {
- LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e);
+ LOG.error("Error initializing Catalog. Please run 'invalidate metadata'", e);
}
catalogOpExecutor_ = new CatalogOpExecutor(catalog_);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 07d6ec6..0d502e5 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -117,7 +117,8 @@ public class JniFrontend {
*/
public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile,
String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel,
- int otherLogLevel, boolean allowAuthToLocal) throws InternalException {
+ int otherLogLevel, boolean allowAuthToLocal, String defaultKuduMasterHosts)
+ throws InternalException {
BackendConfig.setAuthToLocal(allowAuthToLocal);
GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
TLogLevel.values()[otherLogLevel]);
@@ -136,7 +137,7 @@ public class JniFrontend {
}
LOG.info(JniUtil.getJavaVersion());
- frontend_ = new Frontend(authConfig);
+ frontend_ = new Frontend(authConfig, defaultKuduMasterHosts);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
new file mode 100644
index 0000000..bd6d0fe
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import java.lang.NumberFormatException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.analysis.ToSqlUtils;
+import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.util.KuduUtil;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.PartialRow;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a helper for the CatalogOpExecutor to provide Kudu related DDL functionality
+ * such as creating and dropping tables from Kudu.
+ */
+public class KuduCatalogOpExecutor {
+ public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
+
+ /**
+ * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
+ * Throws an exception if 'msTbl' represents an external table or if the table couldn't
+ * be created in Kudu.
+ */
+ static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+ TCreateTableParams params) throws ImpalaRuntimeException {
+ Preconditions.checkState(!Table.isExternalTable(msTbl));
+ String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+ LOG.debug(String.format("Creating table '%s' in master '%s'", kuduTableName,
+ masterHosts));
+ try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+ // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
+ // (see KUDU-1710).
+ if (kudu.tableExists(kuduTableName)) {
+ if (params.if_not_exists) return;
+ throw new ImpalaRuntimeException(String.format(
+ "Table '%s' already exists in Kudu.", kuduTableName));
+ }
+ Schema schema = createTableSchema(msTbl, params);
+ CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
+ kudu.createTable(kuduTableName, schema, tableOpts);
+ } catch (Exception e) {
+ throw new ImpalaRuntimeException(String.format("Error creating table '%s'",
+ kuduTableName), e);
+ }
+ }
+
+ /**
+ * Creates the schema of a new Kudu table.
+ */
+ private static Schema createTableSchema(
+ org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params)
+ throws ImpalaRuntimeException {
+ Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names());
+ List<FieldSchema> fieldSchemas = msTbl.getSd().getCols();
+ List<ColumnSchema> colSchemas = new ArrayList<>(fieldSchemas.size());
+ for (FieldSchema fieldSchema : fieldSchemas) {
+ Type type = Type.parseColumnType(fieldSchema.getType());
+ Preconditions.checkState(type != null);
+ org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
+ // Create the actual column and check if the column is a key column
+ ColumnSchemaBuilder csb =
+ new ColumnSchemaBuilder(fieldSchema.getName(), kuduType);
+ boolean isKeyCol = keyColNames.contains(fieldSchema.getName());
+ csb.key(isKeyCol);
+ csb.nullable(!isKeyCol);
+ colSchemas.add(csb.build());
+ }
+ return new Schema(colSchemas);
+ }
+
+ /**
+ * Builds the table options of a new Kudu table.
+ */
+ private static CreateTableOptions buildTableOptions(
+ org.apache.hadoop.hive.metastore.api.Table msTbl,
+ TCreateTableParams params, Schema schema) throws ImpalaRuntimeException {
+ CreateTableOptions tableOpts = new CreateTableOptions();
+ // Set the distribution schemes
+ List<TDistributeParam> distributeParams = params.getDistribute_by();
+ if (distributeParams != null) {
+ boolean hasRangePartitioning = false;
+ for (TDistributeParam distParam : distributeParams) {
+ if (distParam.isSetBy_hash_param()) {
+ Preconditions.checkState(!distParam.isSetBy_range_param());
+ tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(),
+ distParam.getBy_hash_param().getNum_buckets());
+ } else {
+ Preconditions.checkState(distParam.isSetBy_range_param());
+ hasRangePartitioning = true;
+ tableOpts.setRangePartitionColumns(
+ distParam.getBy_range_param().getColumns());
+ for (PartialRow partialRow :
+ KuduUtil.parseSplits(schema, distParam.getBy_range_param())) {
+ tableOpts.addSplitRow(partialRow);
+ }
+ }
+ }
+ // If no range-based distribution is specified in a CREATE TABLE statement, Kudu
+ // generates one by default that includes all the primary key columns. To prevent
+ // this from happening, explicitly set the range partition columns to be
+ // an empty list.
+ if (!hasRangePartitioning) {
+ tableOpts.setRangePartitionColumns(Collections.<String>emptyList());
+ }
+ }
+
+ // Set the number of table replicas, if specified.
+ String replication = msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
+ if (!Strings.isNullOrEmpty(replication)) {
+ try {
+ int r = Integer.parseInt(replication);
+ Preconditions.checkState(r > 0);
+ tableOpts.setNumReplicas(r);
+ } catch (NumberFormatException e) {
+ throw new ImpalaRuntimeException(String.format("Invalid number of table " +
+ "replicas specified: '%s'", replication), e);
+ }
+ }
+ return tableOpts;
+ }
+
+ /**
+ * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a
+ * TableNotFoundException is thrown. If the table exists and could not be dropped,
+ * an ImpalaRuntimeException is thrown.
+ */
+ static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+ boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException {
+ Preconditions.checkState(!Table.isExternalTable(msTbl));
+ String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+ LOG.debug(String.format("Dropping table '%s' from master '%s'", tableName,
+ masterHosts));
+ try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
+ // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
+ // (see KUDU-1710).
+ if (kudu.tableExists(tableName)) {
+ kudu.deleteTable(tableName);
+ } else if (!ifExists) {
+ throw new TableNotFoundException(String.format(
+ "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts));
+ }
+ } catch (Exception e) {
+ throw new ImpalaRuntimeException(String.format("Error dropping table '%s'",
+ tableName), e);
+ }
+ }
+
+ /**
+ * Reads the column definitions from a Kudu table and populates 'msTbl' with
+ * an equivalent schema. Throws an exception if any errors are encountered.
+ */
+ public static void populateColumnsFromKudu(
+ org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
+ org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
+ List<FieldSchema> cols = msTblCopy.getSd().getCols();
+ String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+ String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+ LOG.debug(String.format("Loading schema of table '%s' from master '%s'",
+ kuduTableName, masterHosts));
+ try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+ if (!kudu.tableExists(kuduTableName)) {
+ throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
+ "'%s'", kuduTableName));
+ }
+ org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName);
+ // Replace the columns in the Metastore table with the columns from the recently
+ // accessed Kudu schema.
+ cols.clear();
+ for (ColumnSchema colSchema : kuduTable.getSchema().getColumns()) {
+ Type type = KuduUtil.toImpalaType(colSchema.getType());
+ cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(), null));
+ }
+ } catch (Exception e) {
+ throw new ImpalaRuntimeException(String.format("Error loading schema of table " +
+ "'%s'", kuduTableName), e);
+ }
+ List<FieldSchema> newCols = msTbl.getSd().getCols();
+ newCols.clear();
+ newCols.addAll(cols);
+ }
+
+ /**
+ * Validates the table properties of a Kudu table. It checks that the specified master
+ * addresses point to valid Kudu masters and that the table exists.
+ * Throws an ImpalaRuntimeException if this is not the case.
+ */
+ public static void validateKuduTblExists(
+ org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
+ String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+ Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
+ String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+ Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+ try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+ kudu.tableExists(kuduTableName);
+ } catch (Exception e) {
+ throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not exist " +
+ "on master '%s'", kuduTableName, masterHosts), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index b9f8653..a679032 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -18,14 +18,15 @@
package org.apache.impala.util;
import java.io.StringReader;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonReader;
+import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TDistributeByRangeParam;
import org.apache.impala.thrift.TRangeLiteral;
@@ -33,48 +34,17 @@ import org.apache.impala.thrift.TRangeLiteralList;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
-import static org.apache.impala.catalog.Type.parseColumnType;
import static java.lang.String.format;
public class KuduUtil {
private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys.";
-
- /**
- * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have
- * a matching schema.
- */
- public static boolean compareSchema(Table msTable, KuduTable kuduTable)
- throws ImpalaRuntimeException {
- List<FieldSchema> msFields = msTable.getSd().getCols();
- List<ColumnSchema> kuduFields = kuduTable.getSchema().getColumns();
- if (msFields.size() != kuduFields.size()) return false;
-
- HashMap<String, ColumnSchema> kuduFieldMap = Maps.newHashMap();
- for (ColumnSchema kuduField : kuduFields) {
- kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField);
- }
-
- for (FieldSchema msField : msFields) {
- ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase());
- if (kuduField == null
- || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) {
- return false;
- }
- }
-
- return true;
- }
+ private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
/**
* Parses split keys from statements.
@@ -145,10 +115,9 @@ public class KuduUtil {
/**
* Sets the value in 'key' at 'pos', given the json representation.
*/
- private static void setKey(Type type, JsonArray array, int pos, PartialRow key)
- throws ImpalaRuntimeException {
+ private static void setKey(org.apache.kudu.Type type, JsonArray array, int pos,
+ PartialRow key) throws ImpalaRuntimeException {
switch (type) {
- case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break;
case INT8: key.addByte(pos, (byte) array.getInt(pos)); break;
case INT16: key.addShort(pos, (short) array.getInt(pos)); break;
case INT32: key.addInt(pos, array.getInt(pos)); break;
@@ -163,13 +132,9 @@ public class KuduUtil {
/**
* Sets the value in 'key' at 'pos', given the range literal.
*/
- private static void setKey(Type type, TRangeLiteral literal, int pos, String colName,
- PartialRow key) throws ImpalaRuntimeException {
+ private static void setKey(org.apache.kudu.Type type, TRangeLiteral literal, int pos,
+ String colName, PartialRow key) throws ImpalaRuntimeException {
switch (type) {
- case BOOL:
- checkCorrectType(literal.isSetBool_literal(), type, colName, literal);
- key.addBoolean(pos, literal.isBool_literal());
- break;
case INT8:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addByte(pos, (byte) literal.getInt_literal());
@@ -200,8 +165,8 @@ public class KuduUtil {
* If correctType is true, returns. Otherwise throws a formatted error message
* indicating problems with the type of the literal of the range literal.
*/
- private static void checkCorrectType(boolean correctType, Type t, String colName,
- TRangeLiteral literal) throws ImpalaRuntimeException {
+ private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t,
+ String colName, TRangeLiteral literal) throws ImpalaRuntimeException {
if (correctType) return;
throw new ImpalaRuntimeException(
format("Expected %s literal for column '%s' got '%s'", t.getName(), colName,
@@ -220,11 +185,24 @@ public class KuduUtil {
return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase()));
}
+ public static boolean isSupportedKeyType(org.apache.impala.catalog.Type type) {
+ return type.isIntegerType() || type.isStringType();
+ }
+
+ /**
+ * Return the name that should be used in Kudu when creating a table, assuming a custom
+ * name was not provided.
+ */
+ public static String getDefaultCreateKuduTableName(String metastoreDbName,
+ String metastoreTableName) {
+ return KUDU_TABLE_NAME_PREFIX + metastoreDbName + "." + metastoreTableName;
+ }
+
/**
* Converts a given Impala catalog type to the Kudu type. Throws an exception if the
* type cannot be converted.
*/
- public static Type fromImpalaType(org.apache.impala.catalog.Type t)
+ public static org.apache.kudu.Type fromImpalaType(Type t)
throws ImpalaRuntimeException {
if (!t.isScalarType()) {
throw new ImpalaRuntimeException(format(
@@ -232,16 +210,16 @@ public class KuduUtil {
}
ScalarType s = (ScalarType) t;
switch (s.getPrimitiveType()) {
- case TINYINT: return Type.INT8;
- case SMALLINT: return Type.INT16;
- case INT: return Type.INT32;
- case BIGINT: return Type.INT64;
- case BOOLEAN: return Type.BOOL;
- case CHAR: return Type.STRING;
- case STRING: return Type.STRING;
- case VARCHAR: return Type.STRING;
- case DOUBLE: return Type.DOUBLE;
- case FLOAT: return Type.FLOAT;
+ case TINYINT: return org.apache.kudu.Type.INT8;
+ case SMALLINT: return org.apache.kudu.Type.INT16;
+ case INT: return org.apache.kudu.Type.INT32;
+ case BIGINT: return org.apache.kudu.Type.INT64;
+ case BOOLEAN: return org.apache.kudu.Type.BOOL;
+ case CHAR: return org.apache.kudu.Type.STRING;
+ case STRING: return org.apache.kudu.Type.STRING;
+ case VARCHAR: return org.apache.kudu.Type.STRING;
+ case DOUBLE: return org.apache.kudu.Type.DOUBLE;
+ case FLOAT: return org.apache.kudu.Type.FLOAT;
/* Fall through below */
case INVALID_TYPE:
case NULL_TYPE:
@@ -256,11 +234,27 @@ public class KuduUtil {
}
}
+ public static Type toImpalaType(org.apache.kudu.Type t)
+ throws ImpalaRuntimeException {
+ switch (t) {
+ case BOOL: return Type.BOOLEAN;
+ case DOUBLE: return Type.DOUBLE;
+ case FLOAT: return Type.FLOAT;
+ case INT8: return Type.TINYINT;
+ case INT16: return Type.SMALLINT;
+ case INT32: return Type.INT;
+ case INT64: return Type.BIGINT;
+ case STRING: return Type.STRING;
+ default:
+ throw new ImpalaRuntimeException(String.format(
+ "Kudu type %s is not supported in Impala", t));
+ }
+ }
+
/**
* Returns the string value of the RANGE literal.
*/
static String toString(TRangeLiteral l) throws ImpalaRuntimeException {
- if (l.isSetBool_literal()) return String.valueOf(l.bool_literal);
if (l.isSetString_literal()) return String.valueOf(l.string_literal);
if (l.isSetInt_literal()) return String.valueOf(l.int_literal);
throw new ImpalaRuntimeException("Unsupported type for RANGE literal.");