You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/22 05:33:34 UTC
[07/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE
TABLE statements with Kudu tables
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 3acb1a3..73173cb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -17,23 +17,12 @@
package org.apache.impala.analysis;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
-import org.apache.hadoop.fs.permission.FsAction;
-
-import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.HdfsStorageDescriptor;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.FileSystemUtil;
-import org.apache.impala.thrift.TAccessEvent;
-import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.THdfsFileFormat;
import org.apache.impala.thrift.TTableName;
@@ -41,113 +30,80 @@ import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaParser;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.KuduUtil;
-import org.apache.impala.util.MetaStoreUtil;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
/**
* Represents a CREATE TABLE statement.
*/
public class CreateTableStmt extends StatementBase {
- private List<ColumnDef> columnDefs_;
- private final String comment_;
- private final boolean isExternal_;
- private final boolean ifNotExists_;
- private final THdfsFileFormat fileFormat_;
- private final ArrayList<ColumnDef> partitionColDefs_;
- private final RowFormat rowFormat_;
- private TableName tableName_;
- private final Map<String, String> tblProperties_;
- private final Map<String, String> serdeProperties_;
- private final HdfsCachingOp cachingOp_;
- private HdfsUri location_;
- private final List<DistributeParam> distributeParams_;
-
- // Set during analysis
+
+ @VisibleForTesting
+ final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be"
+ + " specified using 'STORED AS KUDU' without using the storage handler table"
+ + " property.";
+
+ // Table parameters specified in a CREATE TABLE statement
+ private final TableDef tableDef_;
+
+ // Table owner. Set during analysis
private String owner_;
- /**
- * Builds a CREATE TABLE statement
- * @param tableName - Name of the new table
- * @param columnDefs - List of column definitions for the table
- * @param partitionColumnDefs - List of partition column definitions for the table
- * @param isExternal - If true, the table's data will be preserved if dropped.
- * @param comment - Comment to attach to the table
- * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT
- * to specify default row format.
- * @param fileFormat - File format of the table
- * @param location - The HDFS location of where the table data will stored.
- * @param cachingOp - The HDFS caching op that should be applied to this table.
- * @param ifNotExists - If true, no errors are thrown if the table already exists.
- * @param tblProperties - Optional map of key/values to persist with table metadata.
- * @param serdeProperties - Optional map of key/values to persist with table serde
- * metadata.
- */
- public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs,
- List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment,
- RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location,
- HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties,
- Map<String, String> serdeProperties, List<DistributeParam> distributeParams) {
- Preconditions.checkNotNull(columnDefs);
- Preconditions.checkNotNull(partitionColumnDefs);
- Preconditions.checkNotNull(fileFormat);
- Preconditions.checkNotNull(rowFormat);
- Preconditions.checkNotNull(tableName);
-
- columnDefs_ = Lists.newArrayList(columnDefs);
- comment_ = comment;
- isExternal_ = isExternal;
- ifNotExists_ = ifNotExists;
- fileFormat_ = fileFormat;
- location_ = location;
- cachingOp_ = cachingOp;
- partitionColDefs_ = Lists.newArrayList(partitionColumnDefs);
- rowFormat_ = rowFormat;
- tableName_ = tableName;
- tblProperties_ = tblProperties;
- serdeProperties_ = serdeProperties;
- unescapeProperties(tblProperties_);
- unescapeProperties(serdeProperties_);
- distributeParams_ = distributeParams;
+ public CreateTableStmt(TableDef tableDef) {
+ Preconditions.checkNotNull(tableDef);
+ tableDef_ = tableDef;
}
/**
* Copy c'tor.
*/
- public CreateTableStmt(CreateTableStmt other) {
- columnDefs_ = Lists.newArrayList(other.columnDefs_);
- comment_ = other.comment_;
- isExternal_ = other.isExternal_;
- ifNotExists_ = other.ifNotExists_;
- fileFormat_ = other.fileFormat_;
- location_ = other.location_;
- cachingOp_ = other.cachingOp_;
- partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_);
- rowFormat_ = other.rowFormat_;
- tableName_ = other.tableName_;
- tblProperties_ = other.tblProperties_;
- serdeProperties_ = other.serdeProperties_;
- distributeParams_ = other.distributeParams_;
+ CreateTableStmt(CreateTableStmt other) {
+ this(other.tableDef_);
+ owner_ = other.owner_;
}
@Override
public CreateTableStmt clone() { return new CreateTableStmt(this); }
- public String getTbl() { return tableName_.getTbl(); }
- public TableName getTblName() { return tableName_; }
- public List<ColumnDef> getColumnDefs() { return columnDefs_; }
- public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
- public String getComment() { return comment_; }
- public boolean isExternal() { return isExternal_; }
- public boolean getIfNotExists() { return ifNotExists_; }
- public HdfsUri getLocation() { return location_; }
- public void setLocation(HdfsUri location) { this.location_ = location; }
- public THdfsFileFormat getFileFormat() { return fileFormat_; }
- public RowFormat getRowFormat() { return rowFormat_; }
- public Map<String, String> getTblProperties() { return tblProperties_; }
- public Map<String, String> getSerdeProperties() { return serdeProperties_; }
+ public String getTbl() { return getTblName().getTbl(); }
+ public TableName getTblName() { return tableDef_.getTblName(); }
+ public boolean getIfNotExists() { return tableDef_.getIfNotExists(); }
+ public List<ColumnDef> getColumnDefs() { return tableDef_.getColumnDefs(); }
+ private void setColumnDefs(List<ColumnDef> colDefs) {
+ getColumnDefs().clear();
+ getColumnDefs().addAll(colDefs);
+ }
+ private List<ColumnDef> getPrimaryKeyColumnDefs() {
+ return tableDef_.getPrimaryKeyColumnDefs();
+ }
+ public boolean isExternal() { return tableDef_.isExternal(); }
+ public List<ColumnDef> getPartitionColumnDefs() {
+ return tableDef_.getPartitionColumnDefs();
+ }
+ public List<DistributeParam> getDistributeParams() {
+ return tableDef_.getDistributeParams();
+ }
+ public String getComment() { return tableDef_.getComment(); }
+ Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); }
+ private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); }
+ public HdfsUri getLocation() { return tableDef_.getLocation(); }
+ Map<String, String> getSerdeProperties() { return tableDef_.getSerdeProperties(); }
+ public THdfsFileFormat getFileFormat() { return tableDef_.getFileFormat(); }
+ RowFormat getRowFormat() { return tableDef_.getRowFormat(); }
+
+ // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user
+ // at the table level. Note that primary keys may also be declared in column
+ // definitions, those are not included here (they are stored in the ColumnDefs).
+ List<String> getTblPrimaryKeyColumnNames() {
+ return tableDef_.getPrimaryKeyColumnNames();
+ }
/**
* Can only be called after analysis, returns the owner of this table (the user from
@@ -164,7 +120,7 @@ public class CreateTableStmt extends StatementBase {
*/
public String getDb() {
Preconditions.checkState(isAnalyzed());
- return tableName_.getDb();
+ return getTblName().getDb();
}
@Override
@@ -173,240 +129,246 @@ public class CreateTableStmt extends StatementBase {
public TCreateTableParams toThrift() {
TCreateTableParams params = new TCreateTableParams();
params.setTable_name(new TTableName(getDb(), getTbl()));
- for (ColumnDef col: getColumnDefs()) {
- params.addToColumns(col.toThrift());
- }
+ List<org.apache.impala.thrift.TColumn> tColumns = Lists.newArrayList();
+ for (ColumnDef col: getColumnDefs()) tColumns.add(col.toThrift());
+ params.setColumns(tColumns);
for (ColumnDef col: getPartitionColumnDefs()) {
params.addToPartition_columns(col.toThrift());
}
params.setOwner(getOwner());
params.setIs_external(isExternal());
- params.setComment(comment_);
- params.setLocation(location_ == null ? null : location_.toString());
- if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift());
- params.setRow_format(rowFormat_.toThrift());
- params.setFile_format(fileFormat_);
+ params.setComment(getComment());
+ params.setLocation(getLocation() == null ? null : getLocation().toString());
+ if (getCachingOp() != null) params.setCache_op(getCachingOp().toThrift());
+ if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift());
+ params.setFile_format(getFileFormat());
params.setIf_not_exists(getIfNotExists());
- if (tblProperties_ != null) params.setTable_properties(tblProperties_);
- if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_);
- if (distributeParams_ != null) {
- for (DistributeParam d : distributeParams_) {
- params.addToDistribute_by(d.toThrift());
- }
+ params.setTable_properties(getTblProperties());
+ params.setSerde_properties(getSerdeProperties());
+ for (DistributeParam d: getDistributeParams()) {
+ params.addToDistribute_by(d.toThrift());
+ }
+ for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) {
+ params.addToPrimary_key_column_names(pkColDef.getColName());
}
+
return params;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
- Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
- tableName_ = analyzer.getFqTableName(tableName_);
- tableName_.analyze();
owner_ = analyzer.getUser().getName();
-
- MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_);
- MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_);
-
- if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(),
- Privilege.CREATE) && !ifNotExists_) {
- throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_);
- }
-
- analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(),
- TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
-
- // Only Avro tables can have empty column defs because they can infer them from
- // the Avro schema.
- if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) {
+ tableDef_.analyze(analyzer);
+ analyzeKuduFormat(analyzer);
+ // Avro tables can have empty column defs because they can infer them from the Avro
+ // schema. Likewise for external Kudu tables, the schema can be read from Kudu.
+ if (getColumnDefs().isEmpty() && getFileFormat() != THdfsFileFormat.AVRO
+ && getFileFormat() != THdfsFileFormat.KUDU) {
throw new AnalysisException("Table requires at least 1 column");
}
-
- if (location_ != null) {
- location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
- }
-
- analyzeRowFormat(analyzer);
-
- // Check that all the column names are valid and unique.
- analyzeColumnDefs(analyzer);
-
- if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals(
- getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
- analyzeKuduTable(analyzer);
- } else if (distributeParams_ != null) {
- throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause.");
- }
-
- if (fileFormat_ == THdfsFileFormat.AVRO) {
- columnDefs_ = analyzeAvroSchema(analyzer);
- if (columnDefs_.isEmpty()) {
+ if (getFileFormat() == THdfsFileFormat.AVRO) {
+ setColumnDefs(analyzeAvroSchema(analyzer));
+ if (getColumnDefs().isEmpty()) {
throw new AnalysisException(
"An Avro table requires column definitions or an Avro schema.");
}
- AvroSchemaUtils.setFromSerdeComment(columnDefs_);
- analyzeColumnDefs(analyzer);
+ AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
}
+ }
- if (cachingOp_ != null) {
- cachingOp_.analyze(analyzer);
- if (cachingOp_.shouldCache() && location_ != null &&
- !FileSystemUtil.isPathCacheable(location_.getPath())) {
- throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
- "Please retry without caching: CREATE TABLE %s ... UNCACHED",
- location_.toString(), tableName_));
+ /**
+ * Analyzes the parameters of a CREATE TABLE ... STORED AS KUDU statement. Also checks
+ * if Kudu specific properties and parameters are specified for non-Kudu tables.
+ */
+ private void analyzeKuduFormat(Analyzer analyzer) throws AnalysisException {
+ if (getFileFormat() != THdfsFileFormat.KUDU) {
+ if (KuduTable.KUDU_STORAGE_HANDLER.equals(
+ getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
+ throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+ }
+ AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
+ "Only Kudu tables can use the DISTRIBUTE BY clause.");
+ if (hasPrimaryKey()) {
+ throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY.");
}
+ return;
}
- // Analyze 'skip.header.line.format' property.
- if (tblProperties_ != null) {
- AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_);
+ analyzeKuduTableProperties(analyzer);
+ if (isExternal()) {
+ analyzeExternalKuduTableParams();
+ } else {
+ analyzeManagedKuduTableParams(analyzer);
}
}
- private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
- Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter());
- Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter());
- Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar());
- if (fileFormat_ == THdfsFileFormat.TEXT) {
- if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
- if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
- if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
- if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) {
- throw new AnalysisException("Field delimiter and line delimiter have same " +
- "value: byte " + fieldDelim);
- }
- if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) {
- analyzer.addWarning("Field delimiter and escape character have same value: " +
- "byte " + fieldDelim + ". Escape character will be ignored");
- }
- if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) {
- analyzer.addWarning("Line delimiter and escape character have same value: " +
- "byte " + lineDelim + ". Escape character will be ignored");
+ /**
+ * Analyzes and checks table properties which are common to both managed and external
+ * Kudu tables.
+ */
+ private void analyzeKuduTableProperties(Analyzer analyzer) throws AnalysisException {
+ if (getTblProperties().containsKey(KuduTable.KEY_STORAGE_HANDLER)) {
+ throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+ }
+ getTblProperties().put(KuduTable.KEY_STORAGE_HANDLER, KuduTable.KUDU_STORAGE_HANDLER);
+
+ String masterHosts = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS);
+ if (Strings.isNullOrEmpty(masterHosts)) {
+ masterHosts = analyzer.getCatalog().getDefaultKuduMasterHosts();
+ if (masterHosts.isEmpty()) {
+ throw new AnalysisException(String.format(
+ "Table property '%s' is required when the impalad startup flag " +
+ "-kudu_master_hosts is not used.", KuduTable.KEY_MASTER_HOSTS));
}
+ getTblProperties().put(KuduTable.KEY_MASTER_HOSTS, masterHosts);
}
+
+ // TODO: Find out what is creating a directory in HDFS and stop doing that. Kudu
+ // tables shouldn't have HDFS dirs.
+ // https://issues.cloudera.org/browse/IMPALA-3570
+ AnalysisUtils.throwIfNotNull(getCachingOp(),
+ "A Kudu table cannot be cached in HDFS.");
+ AnalysisUtils.throwIfNotNull(getLocation(), "LOCATION cannot be specified for a " +
+ "Kudu table.");
+ AnalysisUtils.throwIfNotEmpty(tableDef_.getPartitionColumnDefs(),
+ "PARTITIONED BY cannot be used in Kudu tables.");
}
/**
- * Analyzes columnDefs_ and partitionColDefs_ checking whether all column
- * names are unique.
+ * Analyzes and checks parameters specified for external Kudu tables.
*/
- private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
- Set<String> colNames = Sets.newHashSet();
- for (ColumnDef colDef: columnDefs_) {
- colDef.analyze();
- if (!colNames.add(colDef.getColName().toLowerCase())) {
- throw new AnalysisException("Duplicate column name: " + colDef.getColName());
- }
+ private void analyzeExternalKuduTableParams() throws AnalysisException {
+ AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
+ String.format("Table property %s must be specified when creating " +
+ "an external Kudu table.", KuduTable.KEY_TABLE_NAME));
+ if (hasPrimaryKey()
+ || getTblProperties().containsKey(KuduTable.KEY_KEY_COLUMNS)) {
+ throw new AnalysisException("Primary keys cannot be specified for an external " +
+ "Kudu table");
}
- for (ColumnDef colDef: partitionColDefs_) {
- colDef.analyze();
- if (!colDef.getType().supportsTablePartitioning()) {
- throw new AnalysisException(
- String.format("Type '%s' is not supported as partition-column type " +
- "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+ AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS),
+ String.format("Table property '%s' cannot be used with an external Kudu table.",
+ KuduTable.KEY_TABLET_REPLICAS));
+ AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
+ "Columns cannot be specified with an external Kudu table.");
+ AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
+ "DISTRIBUTE BY cannot be used with an external Kudu table.");
+ }
+
+ /**
+ * Analyzes and checks parameters specified for managed Kudu tables.
+ */
+ private void analyzeManagedKuduTableParams(Analyzer analyzer) throws AnalysisException {
+ // If no Kudu table name is specified in tblproperties, generate one using the
+ // current database as a prefix to avoid conflicts in Kudu.
+ if (!getTblProperties().containsKey(KuduTable.KEY_TABLE_NAME)) {
+ getTblProperties().put(KuduTable.KEY_TABLE_NAME,
+ KuduUtil.getDefaultCreateKuduTableName(getDb(), getTbl()));
+ }
+ AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),
+ String.format("PRIMARY KEY must be used instead of the table property '%s'.",
+ KuduTable.KEY_KEY_COLUMNS));
+ if (!hasPrimaryKey()) {
+ throw new AnalysisException("A primary key is required for a Kudu table.");
+ }
+ String tabletReplicas = getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS);
+ if (tabletReplicas != null) {
+ Integer r = Ints.tryParse(tabletReplicas);
+ if (r == null) {
+ throw new AnalysisException(String.format(
+ "Table property '%s' must be an integer.", KuduTable.KEY_TABLET_REPLICAS));
+ }
+ if (r <= 0) {
+ throw new AnalysisException("Number of tablet replicas must be greater than " +
+ "zero. Given number of replicas is: " + r.toString());
}
- if (!colNames.add(colDef.getColName().toLowerCase())) {
- throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+ }
+
+ if (!getDistributeParams().isEmpty()) {
+ analyzeDistributeParams(analyzer);
+ } else {
+ throw new AnalysisException("Table distribution must be specified for " +
+ "managed Kudu tables.");
+ }
+ }
+
+ /**
+ * Analyzes the distribution schemes specified in the CREATE TABLE statement.
+ */
+ private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
+ Map<String, ColumnDef> pkColDefsByName =
+ ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
+ for (DistributeParam distributeParam: getDistributeParams()) {
+ // If no column names were specified in this distribution scheme, use all the
+ // primary key columns.
+ if (!distributeParam.hasColumnNames()) {
+ distributeParam.setColumnNames(pkColDefsByName.keySet());
}
+ distributeParam.setPkColumnDefMap(pkColDefsByName);
+ distributeParam.analyze(analyzer);
}
}
/**
- * Analyzes the Avro schema and compares it with the columnDefs_ to detect
+ * Checks if a primary key is specified in a CREATE TABLE stmt. Should only be called
+ * after tableDef_ has been analyzed.
+ */
+ private boolean hasPrimaryKey() {
+ Preconditions.checkState(tableDef_.isAnalyzed());
+ return !tableDef_.getPrimaryKeyColumnDefs().isEmpty();
+ }
+
+ /**
+ * Analyzes the Avro schema and compares it with the getColumnDefs() to detect
* inconsistencies. Returns a list of column descriptors that should be
- * used for creating the table (possibly identical to columnDefs_).
+ * used for creating the table (possibly identical to getColumnDefs()).
*/
- private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer)
- throws AnalysisException {
- Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO);
+ private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(getFileFormat() == THdfsFileFormat.AVRO);
// Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
- schemaSearchLocations.add(serdeProperties_);
- schemaSearchLocations.add(tblProperties_);
- String avroSchema = null;
- List<ColumnDef> avroCols = null; // parsed from avroSchema
+ schemaSearchLocations.add(getSerdeProperties());
+ schemaSearchLocations.add(getTblProperties());
+ String avroSchema;
+ List<ColumnDef> avroCols; // parsed from avroSchema
try {
avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
if (avroSchema == null) {
// No Avro schema was explicitly set in the serde or table properties, so infer
// the Avro schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertColumnDefs(
- columnDefs_, tableName_.toString());
+ getColumnDefs(), getTblName().toString());
avroSchema = inferredSchema.toString();
}
if (Strings.isNullOrEmpty(avroSchema)) {
throw new AnalysisException("Avro schema is null or empty: " +
- tableName_.toString());
+ getTblName().toString());
}
avroCols = AvroSchemaParser.parse(avroSchema);
} catch (SchemaParseException e) {
throw new AnalysisException(String.format(
- "Error parsing Avro schema for table '%s': %s", tableName_.toString(),
+ "Error parsing Avro schema for table '%s': %s", getTblName().toString(),
e.getMessage()));
}
Preconditions.checkNotNull(avroCols);
- // Analyze the Avro schema to detect inconsistencies with the columnDefs_.
+ // Analyze the Avro schema to detect inconsistencies with the getColumnDefs().
// In case of inconsistencies, the column defs are ignored in favor of the Avro
// schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104).
StringBuilder warning = new StringBuilder();
List<ColumnDef> reconciledColDefs =
- AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning);
+ AvroSchemaUtils.reconcileSchemas(getColumnDefs(), avroCols, warning);
if (warning.length() > 0) analyzer.addWarning(warning.toString());
return reconciledColDefs;
}
- private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
- // Validate that Kudu table is correctly specified.
- if (!KuduTable.tableParamsAreValid(getTblProperties())) {
- throw new AnalysisException("Kudu table is missing parameters " +
- String.format("in table properties. Please verify if %s, %s, and %s are "
- + "present and have valid values.",
- KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES,
- KuduTable.KEY_KEY_COLUMNS));
- }
-
- // Kudu table cannot be a cached table
- if (cachingOp_ != null) {
- throw new AnalysisException("A Kudu table cannot be cached in HDFS.");
- }
-
- if (distributeParams_ != null) {
- if (isExternal_) {
- throw new AnalysisException(
- "The DISTRIBUTE BY clause may not be specified for external tables.");
- }
-
- List<String> keyColumns = KuduUtil.parseKeyColumnsAsList(
- getTblProperties().get(KuduTable.KEY_KEY_COLUMNS));
- for (DistributeParam d : distributeParams_) {
- // If the columns are not set, default to all key columns
- if (d.getColumns() == null) d.setColumns(keyColumns);
- d.analyze(analyzer);
- }
- } else if (!isExternal_) {
- throw new AnalysisException(
- "A data distribution must be specified using the DISTRIBUTE BY clause.");
- }
- }
-
- private Byte analyzeRowFormatValue(String value) throws AnalysisException {
- if (value == null) return null;
- Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
- if (byteVal == null) {
- throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
- "terminators must be specified as a single character or as a decimal " +
- "value in the range [-128:127]: " + value);
- }
- return byteVal;
- }
-
/**
* Unescapes all values in the property map.
*/
- public static void unescapeProperties(Map<String, String> propertyMap) {
+ static void unescapeProperties(Map<String, String> propertyMap) {
if (propertyMap == null) return;
for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
propertyMap.put(kv.getKey(),
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
index 319fe50..34bed86 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
@@ -17,19 +17,20 @@
package org.apache.impala.analysis;
-import java.math.BigDecimal;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TDistributeByHashParam;
import org.apache.impala.thrift.TDistributeByRangeParam;
import org.apache.impala.thrift.TDistributeParam;
-import org.apache.impala.thrift.TDistributeType;
import org.apache.impala.thrift.TRangeLiteral;
import org.apache.impala.thrift.TRangeLiteralList;
+import org.apache.impala.util.KuduUtil;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
/**
@@ -50,16 +51,16 @@ public class DistributeParam implements ParseNode {
/**
* Creates a DistributeParam partitioned by hash.
*/
- public static DistributeParam createHashParam(List<String> cols, BigDecimal buckets) {
- return new DistributeParam(Type.HASH, cols, buckets);
+ public static DistributeParam createHashParam(List<String> cols, int buckets) {
+ return new DistributeParam(Type.HASH, cols, buckets, null);
}
/**
* Creates a DistributeParam partitioned by range.
*/
public static DistributeParam createRangeParam(List<String> cols,
- ArrayList<ArrayList<LiteralExpr>> splitRows) {
- return new DistributeParam(Type.RANGE, cols, splitRows);
+ List<List<LiteralExpr>> splitRows) {
+ return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, splitRows);
}
private static final int NO_BUCKETS = -1;
@@ -69,131 +70,159 @@ public class DistributeParam implements ParseNode {
*/
public enum Type {
HASH, RANGE
- };
+ }
+
+ // May be empty indicating that all keys in the table should be used.
+ private final List<String> colNames_ = Lists.newArrayList();
- private List<String> columns_;
+ // Map of primary key column names to the associated column definitions. Must be set
+ // before the call to analyze().
+ private Map<String, ColumnDef> pkColumnDefByName_;
+ // Distribution type
private final Type type_;
// Only relevant for hash partitioning, -1 otherwise
- private final int num_buckets_;
+ private final int numBuckets_;
// Only relevant for range partitioning, null otherwise
- private final ArrayList<ArrayList<LiteralExpr>> splitRows_;
-
- // Set in analyze()
- private TDistributeByRangeParam rangeParam_;
-
- private DistributeParam(Type t, List<String> cols, BigDecimal buckets) {
- type_ = t;
- columns_ = cols;
- num_buckets_ = buckets.intValue();
- splitRows_ = null;
- }
+ private final List<List<LiteralExpr>> splitRows_;
- private DistributeParam(Type t, List<String> cols,
- ArrayList<ArrayList<LiteralExpr>> splitRows) {
+ private DistributeParam(Type t, List<String> colNames, int buckets,
+ List<List<LiteralExpr>> splitRows) {
type_ = t;
- columns_ = cols;
+ for (String name: colNames) colNames_.add(name.toLowerCase());
+ numBuckets_ = buckets;
splitRows_ = splitRows;
- num_buckets_ = NO_BUCKETS;
}
- /**
- * TODO Refactor the logic below to analyze 'columns_'. This analysis should output
- * a vector of column types that would then be used during the analysis of the split
- * rows.
- */
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
- if (type_ == Type.HASH && num_buckets_ <= 1) {
- throw new AnalysisException(String.format(
- "Number of buckets in DISTRIBUTE BY clause '%s' must be larger than 1.",
- toSql()));
- } else if (type_ == Type.RANGE) {
- // Creating the thrift structure simultaneously checks for semantic errors
- rangeParam_ = new TDistributeByRangeParam();
- rangeParam_.setColumns(columns_);
-
- for (ArrayList<LiteralExpr> splitRow : splitRows_) {
- TRangeLiteralList list = new TRangeLiteralList();
- if (splitRow.size() != columns_.size()) {
+ Preconditions.checkState(!colNames_.isEmpty());
+ Preconditions.checkNotNull(pkColumnDefByName_);
+ Preconditions.checkState(!pkColumnDefByName_.isEmpty());
+ // Validate the columns specified in the DISTRIBUTE BY clause
+ for (String colName: colNames_) {
+ if (!pkColumnDefByName_.containsKey(colName)) {
+ throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
+ "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql()));
+ }
+ }
+
+ if (type_ == Type.RANGE) {
+ for (List<LiteralExpr> splitRow : splitRows_) {
+ if (splitRow.size() != colNames_.size()) {
throw new AnalysisException(String.format(
"SPLIT ROWS has different size than number of projected key columns: %d. "
- + "Split row: %s", columns_.size(), splitRowToString(splitRow)));
+ + "Split row: %s", colNames_.size(), splitRowToString(splitRow)));
}
- for (LiteralExpr expr : splitRow) {
+ for (int i = 0; i < splitRow.size(); ++i) {
+ LiteralExpr expr = splitRow.get(i);
+ ColumnDef colDef = pkColumnDefByName_.get(colNames_.get(i));
+ org.apache.impala.catalog.Type colType = colDef.getType();
+ Preconditions.checkState(KuduUtil.isSupportedKeyType(colType));
expr.analyze(analyzer);
- TRangeLiteral literal = new TRangeLiteral();
- if (expr instanceof NumericLiteral) {
- NumericLiteral num = (NumericLiteral) expr;
- if (num.getType().isDecimal() || num.getType().isFloatingPointType()) {
- throw new AnalysisException("Only integral and string values allowed for" +
- " split rows.");
- } else {
- literal.setInt_literal(num.getIntValue());
- }
- } else if (expr instanceof StringLiteral) {
- StringLiteral string = (StringLiteral) expr;
- literal.setString_literal(string.getStringValue());
- } else if (expr instanceof BoolLiteral) {
- BoolLiteral bool = (BoolLiteral) expr;
- literal.setBool_literal(bool.getValue());
- } else {
- throw new AnalysisException(String.format("Split row value is not supported: "
- + "%s (Type: %s).", expr.getStringValue(), expr.getType().toSql()));
+ org.apache.impala.catalog.Type exprType = expr.getType();
+ if (exprType.isNull()) {
+ throw new AnalysisException("Split values cannot be NULL. Split row: " +
+ splitRowToString(splitRow));
+ }
+ if (!org.apache.impala.catalog.Type.isImplicitlyCastable(exprType, colType,
+ true)) {
+ throw new AnalysisException(String.format("Split value %s (type: %s) is " +
+ "not type compatible with column '%s' (type: %s).", expr.toSql(),
+ exprType, colDef.getColName(), colType.toSql()));
}
- list.addToValues(literal);
}
- rangeParam_.addToSplit_rows(list);
}
}
}
@Override
public String toSql() {
- if (num_buckets_ == NO_BUCKETS) {
- List<String> splitRowStrings = Lists.newArrayList();
- for (ArrayList<LiteralExpr> splitRow : splitRows_) {
- splitRowStrings.add(splitRowToString(splitRow));
- }
- return String.format("RANGE(%s) INTO RANGES(%s)", Joiner.on(", ").join(columns_),
- Joiner.on(", ").join(splitRowStrings));
+ StringBuilder builder = new StringBuilder(type_.toString());
+ if (!colNames_.isEmpty()) {
+ builder.append(" (");
+ Joiner.on(", ").appendTo(builder, colNames_).append(")");
+ }
+ if (type_ == Type.HASH) {
+ builder.append(" INTO ");
+ Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+ builder.append(numBuckets_).append(" BUCKETS");
} else {
- return String.format("HASH(%s) INTO %d BUCKETS", Joiner.on(", ").join(columns_),
- num_buckets_);
+ builder.append(" SPLIT ROWS (");
+ if (splitRows_ == null) {
+ builder.append("...");
+ } else {
+ for (List<LiteralExpr> splitRow: splitRows_) {
+ builder.append(splitRowToString(splitRow));
+ }
+ }
+ builder.append(")");
}
+ return builder.toString();
}
- private String splitRowToString(ArrayList<LiteralExpr> splitRow) {
- StringBuilder builder = new StringBuilder();
- builder.append("(");
- List<String> rangeElementStrings = Lists.newArrayList();
- for (LiteralExpr rangeElement : splitRow) {
- rangeElementStrings.add(rangeElement.toSql());
+ @Override
+ public String toString() { return toSql(); }
+
+ private String splitRowToString(List<LiteralExpr> splitRow) {
+ StringBuilder builder = new StringBuilder("(");
+ for (LiteralExpr expr: splitRow) {
+ if (builder.length() > 1) builder.append(", ");
+ builder.append(expr.toSql());
}
- builder.append(Joiner.on(", ").join(rangeElementStrings));
- builder.append(")");
- return builder.toString();
+ return builder.append(")").toString();
}
- TDistributeParam toThrift() {
+ public TDistributeParam toThrift() {
TDistributeParam result = new TDistributeParam();
+ // TODO: Add a validate() function to ensure the validity of distribute params.
if (type_ == Type.HASH) {
TDistributeByHashParam hash = new TDistributeByHashParam();
- hash.setNum_buckets(num_buckets_);
- hash.setColumns(columns_);
+ Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+ hash.setNum_buckets(numBuckets_);
+ hash.setColumns(colNames_);
result.setBy_hash_param(hash);
} else {
Preconditions.checkState(type_ == Type.RANGE);
-
- result.setBy_range_param(rangeParam_);
+ TDistributeByRangeParam rangeParam = new TDistributeByRangeParam();
+ rangeParam.setColumns(colNames_);
+ if (splitRows_ == null) {
+ result.setBy_range_param(rangeParam);
+ return result;
+ }
+ for (List<LiteralExpr> splitRow : splitRows_) {
+ TRangeLiteralList list = new TRangeLiteralList();
+ for (int i = 0; i < splitRow.size(); ++i) {
+ LiteralExpr expr = splitRow.get(i);
+ TRangeLiteral literal = new TRangeLiteral();
+ if (expr instanceof NumericLiteral) {
+ literal.setInt_literal(((NumericLiteral)expr).getIntValue());
+ } else {
+ String exprValue = expr.getStringValue();
+ Preconditions.checkState(!Strings.isNullOrEmpty(exprValue));
+ literal.setString_literal(exprValue);
+ }
+ list.addToValues(literal);
+ }
+ rangeParam.addToSplit_rows(list);
+ }
+ result.setBy_range_param(rangeParam);
}
return result;
}
- public List<String> getColumns() { return columns_; }
- public void setColumns(List<String> cols) { columns_ = cols; }
- public Type getType_() { return type_; }
- public int getNumBuckets() { return num_buckets_; }
+ void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
+ pkColumnDefByName_ = pkColumnDefByName;
+ }
+
+ boolean hasColumnNames() { return !colNames_.isEmpty(); }
+
+ void setColumnNames(Collection<String> colNames) {
+ Preconditions.checkState(colNames_.isEmpty());
+ colNames_.addAll(colNames);
+ }
+
+ public Type getType() { return type_; }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 24b8417..28de1a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -190,7 +190,7 @@ public abstract class ModifyStmt extends StatementBase {
// cast result expressions to the correct type of the referenced slot of the
// target table
- int keyColumnsOffset = table_.getKuduKeyColumnNames().size();
+ int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size();
for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
assignments_.get(i - keyColumnsOffset).first.getType()));
@@ -225,7 +225,7 @@ public abstract class ModifyStmt extends StatementBase {
}
// Add the key columns as slot refs
- for (String k : table_.getKuduKeyColumnNames()) {
+ for (String k : table_.getPrimaryKeyColumnNames()) {
ArrayList<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), k);
SlotRef ref = new SlotRef(path);
ref.analyze(analyzer);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
new file mode 100644
index 0000000..4d3ed80
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement.
+ * TODO: Reconsider this class when we add support for new range partitioning syntax (see
+ * IMPALA-3724).
+ */
+class TableDataLayout {
+
+ private final List<ColumnDef> partitionColDefs_;
+ private final List<DistributeParam> distributeParams_;
+
+ private TableDataLayout(List<ColumnDef> partitionColumnDefs,
+ List<DistributeParam> distributeParams) {
+ partitionColDefs_ = partitionColumnDefs;
+ distributeParams_ = distributeParams;
+ }
+
+ static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) {
+ return new TableDataLayout(partitionColumnDefs,
+ Lists.<DistributeParam>newArrayList());
+ }
+
+ static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) {
+ return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams);
+ }
+
+ static TableDataLayout createEmptyLayout() {
+ return new TableDataLayout(Lists.<ColumnDef>newArrayList(),
+ Lists.<DistributeParam>newArrayList());
+ }
+
+ List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
+ List<DistributeParam> getDistributeParams() { return distributeParams_; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
new file mode 100644
index 0000000..ce08e36
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.thrift.TAccessEvent;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.util.MetaStoreUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * Represents the table parameters in a CREATE TABLE statement. These parameters
+ * correspond to the following clauses in a CREATE TABLE statement:
+ * - EXTERNAL
+ * - IF NOT EXISTS
+ * - PARTITIONED BY
+ * - DISTRIBUTE BY
+ * - ROWFORMAT
+ * - FILEFORMAT
+ * - COMMENT
+ * - SERDEPROPERTIES
+ * - TBLPROPERTIES
+ * - LOCATION
+ * - CACHED IN
+ */
+class TableDef {
+
+ // Name of the new table
+ private final TableName tableName_;
+
+ // List of column definitions
+ private final List<ColumnDef> columnDefs_ = Lists.newArrayList();
+
+ // Names of primary key columns. Populated by the parser. An empty value doesn't
+ // mean no primary keys were specified as the columnDefs_ could contain primary keys.
+ private final List<String> primaryKeyColNames_ = Lists.newArrayList();
+
+ // Authoritative list of primary key column definitions populated during analysis.
+ private final List<ColumnDef> primaryKeyColDefs_ = Lists.newArrayList();
+
+ // If true, the table's data will be preserved if dropped.
+ private final boolean isExternal_;
+
+ // If true, no errors are thrown if the table already exists.
+ private final boolean ifNotExists_;
+
+ // Partitioned/distribute by parameters.
+ private final TableDataLayout dataLayout_;
+
+ // True if analyze() has been called.
+ private boolean isAnalyzed_ = false;
+
+ /**
+ * Set of table options. These options are grouped together for convenience while
+ * parsing CREATE TABLE statements. They are typically found at the end of CREATE
+ * TABLE statements.
+ */
+ static class Options {
+ // Comment to attach to the table
+ final String comment;
+
+ // Custom row format of the table. Leave null to specify default row format.
+ final RowFormat rowFormat;
+
+ // Key/values to persist with table serde metadata.
+ final Map<String, String> serdeProperties;
+
+ // File format of the table
+ final THdfsFileFormat fileFormat;
+
+ // The HDFS location of where the table data will stored.
+ final HdfsUri location;
+
+ // The HDFS caching op that should be applied to this table.
+ final HdfsCachingOp cachingOp;
+
+ // Key/values to persist with table metadata.
+ final Map<String, String> tblProperties;
+
+ Options(String comment, RowFormat rowFormat,
+ Map<String, String> serdeProperties, THdfsFileFormat fileFormat, HdfsUri location,
+ HdfsCachingOp cachingOp, Map<String, String> tblProperties) {
+ this.comment = comment;
+ this.rowFormat = rowFormat;
+ Preconditions.checkNotNull(serdeProperties);
+ this.serdeProperties = serdeProperties;
+ this.fileFormat = fileFormat == null ? THdfsFileFormat.TEXT : fileFormat;
+ this.location = location;
+ this.cachingOp = cachingOp;
+ Preconditions.checkNotNull(tblProperties);
+ this.tblProperties = tblProperties;
+ }
+
+ public Options(String comment) {
+ this(comment, RowFormat.DEFAULT_ROW_FORMAT, Maps.<String, String>newHashMap(),
+ THdfsFileFormat.TEXT, null, null, Maps.<String, String>newHashMap());
+ }
+ }
+
+ private Options options_;
+
+ // Result of analysis.
+ private TableName fqTableName_;
+
+ TableDef(TableName tableName, boolean isExternal, boolean ifNotExists) {
+ tableName_ = tableName;
+ isExternal_ = isExternal;
+ ifNotExists_ = ifNotExists;
+ dataLayout_ = TableDataLayout.createEmptyLayout();
+ }
+
+ public TableName getTblName() {
+ return fqTableName_ != null ? fqTableName_ : tableName_;
+ }
+ public String getTbl() { return tableName_.getTbl(); }
+ public boolean isAnalyzed() { return isAnalyzed_; }
+ List<ColumnDef> getColumnDefs() { return columnDefs_; }
+ List<ColumnDef> getPartitionColumnDefs() {
+ return dataLayout_.getPartitionColumnDefs();
+ }
+ List<String> getPrimaryKeyColumnNames() { return primaryKeyColNames_; }
+ List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
+ boolean isExternal() { return isExternal_; }
+ boolean getIfNotExists() { return ifNotExists_; }
+ List<DistributeParam> getDistributeParams() {
+ return dataLayout_.getDistributeParams();
+ }
+ void setOptions(Options options) {
+ Preconditions.checkNotNull(options);
+ options_ = options;
+ }
+ String getComment() { return options_.comment; }
+ Map<String, String> getTblProperties() { return options_.tblProperties; }
+ HdfsCachingOp getCachingOp() { return options_.cachingOp; }
+ HdfsUri getLocation() { return options_.location; }
+ Map<String, String> getSerdeProperties() { return options_.serdeProperties; }
+ THdfsFileFormat getFileFormat() { return options_.fileFormat; }
+ RowFormat getRowFormat() { return options_.rowFormat; }
+
+ /**
+ * Analyzes the parameters of a CREATE TABLE statement.
+ */
+ void analyze(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+ fqTableName_ = analyzer.getFqTableName(getTblName());
+ fqTableName_.analyze();
+ analyzeColumnDefs();
+ analyzePrimaryKeys();
+
+ if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
+ && !getIfNotExists()) {
+ throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + getTblName());
+ }
+
+ analyzer.addAccessEvent(new TAccessEvent(getTblName().toString(),
+ TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+ Preconditions.checkNotNull(options_);
+ analyzeOptions(analyzer);
+ isAnalyzed_ = true;
+ }
+
+ /**
+ * Analyzes table and partition column definitions, checking whether all column
+ * names are unique.
+ */
+ private void analyzeColumnDefs() throws AnalysisException {
+ Set<String> colNames = Sets.newHashSet();
+ for (ColumnDef colDef: columnDefs_) {
+ colDef.analyze();
+ if (!colNames.add(colDef.getColName().toLowerCase())) {
+ throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+ }
+ }
+ for (ColumnDef colDef: getPartitionColumnDefs()) {
+ colDef.analyze();
+ if (!colDef.getType().supportsTablePartitioning()) {
+ throw new AnalysisException(
+ String.format("Type '%s' is not supported as partition-column type " +
+ "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+ }
+ if (!colNames.add(colDef.getColName().toLowerCase())) {
+ throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+ }
+ }
+ }
+
+ /**
+ * Analyzes the primary key columns. Checks if the specified primary key columns exist
+ * in the table column definitions and if composite primary keys are properly defined
+ * using the PRIMARY KEY (col,..col) clause.
+ */
+ private void analyzePrimaryKeys() throws AnalysisException {
+ for (ColumnDef colDef: columnDefs_) {
+ if (colDef.isPrimaryKey()) primaryKeyColDefs_.add(colDef);
+ }
+ if (primaryKeyColDefs_.size() > 1) {
+ throw new AnalysisException("Multiple primary keys specified. " +
+ "Composite primary keys can be specified using the " +
+ "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+ }
+ if (primaryKeyColNames_.isEmpty()) return;
+ if (!primaryKeyColDefs_.isEmpty()) {
+ throw new AnalysisException("Multiple primary keys specified. " +
+ "Composite primary keys can be specified using the " +
+ "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+ }
+ Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_);
+ for (String colName: primaryKeyColNames_) {
+ colName = colName.toLowerCase();
+ ColumnDef colDef = colDefsByColName.remove(colName);
+ if (colDef == null) {
+ if (ColumnDef.toColumnNames(primaryKeyColDefs_).contains(colName)) {
+ throw new AnalysisException(String.format("Column '%s' is listed multiple " +
+ "times as a PRIMARY KEY.", colName));
+ }
+ throw new AnalysisException(String.format(
+ "PRIMARY KEY column '%s' does not exist in the table", colName));
+ }
+ primaryKeyColDefs_.add(colDef);
+ }
+ }
+
+ private void analyzeOptions(Analyzer analyzer) throws AnalysisException {
+ MetaStoreUtil.checkShortPropertyMap("Property", options_.tblProperties);
+ MetaStoreUtil.checkShortPropertyMap("Serde property", options_.serdeProperties);
+
+ if (options_.location != null) {
+ options_.location.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+ }
+
+ if (options_.cachingOp != null) {
+ options_.cachingOp.analyze(analyzer);
+ if (options_.cachingOp.shouldCache() && options_.location != null &&
+ !FileSystemUtil.isPathCacheable(options_.location.getPath())) {
+ throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
+ "Please retry without caching: CREATE TABLE ... UNCACHED",
+ options_.location));
+ }
+ }
+
+ // Analyze 'skip.header.line.format' property.
+ AlterTableSetTblProperties.analyzeSkipHeaderLineCount(options_.tblProperties);
+ analyzeRowFormat(analyzer);
+ }
+
+ private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
+ if (options_.rowFormat == null) return;
+ if (options_.fileFormat == THdfsFileFormat.KUDU) {
+ throw new AnalysisException(String.format(
+ "ROW FORMAT cannot be specified for file format %s.", options_.fileFormat));
+ }
+
+ Byte fieldDelim = analyzeRowFormatValue(options_.rowFormat.getFieldDelimiter());
+ Byte lineDelim = analyzeRowFormatValue(options_.rowFormat.getLineDelimiter());
+ Byte escapeChar = analyzeRowFormatValue(options_.rowFormat.getEscapeChar());
+ if (options_.fileFormat == THdfsFileFormat.TEXT) {
+ if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
+ if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
+ if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
+ if (fieldDelim.equals(lineDelim)) {
+ throw new AnalysisException("Field delimiter and line delimiter have same " +
+ "value: byte " + fieldDelim);
+ }
+ if (fieldDelim.equals(escapeChar)) {
+ analyzer.addWarning("Field delimiter and escape character have same value: " +
+ "byte " + fieldDelim + ". Escape character will be ignored");
+ }
+ if (lineDelim.equals(escapeChar)) {
+ analyzer.addWarning("Line delimiter and escape character have same value: " +
+ "byte " + lineDelim + ". Escape character will be ignored");
+ }
+ }
+ }
+
+ private Byte analyzeRowFormatValue(String value) throws AnalysisException {
+ if (value == null) return null;
+ Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
+ if (byteVal == null) {
+ throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
+ "terminators must be specified as a single character or as a decimal " +
+ "value in the range [-128:127]: " + value);
+ }
+ return byteVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index b125987..aa24336 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -22,10 +22,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.impala.catalog.KuduTable;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.antlr.runtime.ANTLRStringStream;
import org.antlr.runtime.Token;
import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.parse.HiveLexer;
@@ -35,16 +41,11 @@ import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.HdfsCompression;
import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.View;
-import org.apache.impala.common.PrintUtils;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.impala.util.KuduUtil;
/**
* Contains utility methods for creating SQL strings, for example,
@@ -132,8 +133,9 @@ public class ToSqlUtils {
}
// TODO: Pass the correct compression, if applicable.
return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql,
- partitionColsSql, stmt.getTblProperties(), stmt.getSerdeProperties(),
- stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(),
+ partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null,
+ stmt.getTblProperties(), stmt.getSerdeProperties(), stmt.isExternal(),
+ stmt.getIfNotExists(), stmt.getRowFormat(),
HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null,
stmt.getLocation());
}
@@ -152,7 +154,8 @@ public class ToSqlUtils {
}
// TODO: Pass the correct compression, if applicable.
String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(),
- innerStmt.getComment(), null, partitionColsSql, innerStmt.getTblProperties(),
+ innerStmt.getComment(), null, partitionColsSql,
+ innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getTblProperties(),
innerStmt.getSerdeProperties(), innerStmt.isExternal(),
innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
@@ -169,6 +172,9 @@ public class ToSqlUtils {
if (table instanceof View) return getCreateViewSql((View)table);
org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
HashMap<String, String> properties = Maps.newHashMap(msTable.getParameters());
+ if (properties.containsKey("transient_lastDdlTime")) {
+ properties.remove("transient_lastDdlTime");
+ }
boolean isExternal = msTable.getTableType() != null &&
msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());
String comment = properties.get("comment");
@@ -194,17 +200,40 @@ public class ToSqlUtils {
Map<String, String> serdeParameters = msTable.getSd().getSerdeInfo().getParameters();
String storageHandlerClassName = table.getStorageHandlerClassName();
+ List<String> primaryKeySql = Lists.newArrayList();
+ String kuduDistributeByParams = null;
if (table instanceof KuduTable) {
+ KuduTable kuduTable = (KuduTable) table;
// Kudu tables don't use LOCATION syntax
location = null;
- format = null;
+ format = HdfsFileFormat.KUDU;
// Kudu tables cannot use the Hive DDL syntax for the storage handler
storageHandlerClassName = null;
+ properties.remove(KuduTable.KEY_STORAGE_HANDLER);
+ String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
+ Preconditions.checkNotNull(kuduTableName);
+ if (kuduTableName.equals(KuduUtil.getDefaultCreateKuduTableName(
+ table.getDb().getName(), table.getName()))) {
+ properties.remove(KuduTable.KEY_TABLE_NAME);
+ }
+ // Internal property, should not be exposed to the user.
+ properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
+
+ if (!isExternal) {
+ primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
+
+ List<String> paramsSql = Lists.newArrayList();
+ for (DistributeParam param: kuduTable.getDistributeBy()) {
+ paramsSql.add(param.toSql());
+ }
+ kuduDistributeByParams = Joiner.on(", ").join(paramsSql);
+ }
}
HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
- partitionColsSql, properties, serdeParameters, isExternal, false, rowFormat,
- format, compression, storageHandlerClassName, tableLocation);
+ partitionColsSql, primaryKeySql, kuduDistributeByParams, properties,
+ serdeParameters, isExternal, false, rowFormat, format, compression,
+ storageHandlerClassName, tableLocation);
}
/**
@@ -214,6 +243,7 @@ public class ToSqlUtils {
*/
public static String getCreateTableSql(String dbName, String tableName,
String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
+ List<String> primaryKeysSql, String kuduDistributeByParams,
Map<String, String> tblProperties, Map<String, String> serdeParameters,
boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass,
@@ -227,7 +257,11 @@ public class ToSqlUtils {
sb.append(tableName);
if (columnsSql != null) {
sb.append(" (\n ");
- sb.append(Joiner.on(", \n ").join(columnsSql));
+ sb.append(Joiner.on(",\n ").join(columnsSql));
+ if (!primaryKeysSql.isEmpty()) {
+ sb.append(",\n PRIMARY KEY (");
+ Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")");
+ }
sb.append("\n)");
}
sb.append("\n");
@@ -238,6 +272,10 @@ public class ToSqlUtils {
Joiner.on(", \n ").join(partitionColumnsSql)));
}
+ if (kuduDistributeByParams != null) {
+ sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n");
+ }
+
if (rowFormat != null && !rowFormat.isDefault()) {
sb.append("ROW FORMAT DELIMITED");
if (rowFormat.getFieldDelimiter() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 41573ed..733b2f2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -519,4 +519,8 @@ public abstract class Catalog {
}
return result;
}
+
+ public static boolean isDefaultDb(String dbName) {
+ return DEFAULT_DB.equals(dbName.toLowerCase());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 41c8d62..149b00b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -23,7 +23,6 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -52,7 +50,6 @@ import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.TException;
-import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.SentryConfig;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.FileSystemUtil;
@@ -65,7 +62,6 @@ import org.apache.impala.thrift.TCatalog;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TFunctionBinaryType;
import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPrivilege;
@@ -79,7 +75,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.Files;
/**
* Specialized Catalog that implements the CatalogService specific Catalog
@@ -693,7 +688,7 @@ public class CatalogServiceCatalog extends Catalog {
* Adds a table with the given name to the catalog and returns the new table,
* loading the metadata if needed.
*/
- public Table addTable(String dbName, String tblName) throws TableNotFoundException {
+ public Table addTable(String dbName, String tblName) {
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index d6fb185..0ed67c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -152,6 +152,11 @@ public class Db implements CatalogObject {
return Lists.newArrayList(tableCache_.keySet());
}
+ /**
+ * Returns the tables in the cache.
+ */
+ public List<Table> getTables() { return tableCache_.getValues(); }
+
public boolean containsTable(String tableName) {
return tableCache_.contains(tableName.toLowerCase());
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 86a65bd..e4fce60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -31,8 +31,12 @@ import com.google.common.collect.Lists;
* 2) the output format class
* 3) the serialization library class
* 4) whether scanning complex types from it is supported
+ * 5) whether the file format can skip complex columns in scans and just materialize
+ * scalar typed columns
*
* Important note: Always keep consistent with the classes used in Hive.
+ * TODO: Kudu doesn't belong in this list. Either rename this enum or create a separate
+ * list of storage engines (see IMPALA-4178).
*/
public enum HdfsFileFormat {
RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
@@ -57,7 +61,10 @@ public enum HdfsFileFormat {
PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
- true, true);
+ true, true),
+ KUDU("org.apache.kudu.mapreduce.KuduTableInputFormat",
+ "org.apache.kudu.mapreduce.KuduTableOutputFormat",
+ "", false, false);
private final String inputFormat_;
private final String outputFormat_;
@@ -103,6 +110,7 @@ public enum HdfsFileFormat {
.put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET)
.put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET)
.put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET)
+ .put(KUDU.inputFormat(), KUDU)
.build();
/**
@@ -138,6 +146,7 @@ public enum HdfsFileFormat {
case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE;
case AVRO: return HdfsFileFormat.AVRO;
case PARQUET: return HdfsFileFormat.PARQUET;
+ case KUDU: return HdfsFileFormat.KUDU;
default:
throw new RuntimeException("Unknown THdfsFileFormat: "
+ thriftFormat + " - should never happen!");
@@ -151,6 +160,7 @@ public enum HdfsFileFormat {
case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE;
case AVRO: return THdfsFileFormat.AVRO;
case PARQUET: return THdfsFileFormat.PARQUET;
+ case KUDU: return THdfsFileFormat.KUDU;
default:
throw new RuntimeException("Unknown HdfsFormat: "
+ this + " - should never happen!");
@@ -173,6 +183,7 @@ public enum HdfsFileFormat {
case SEQUENCE_FILE: return "SEQUENCEFILE";
case AVRO: return "AVRO";
case PARQUET: return "PARQUET";
+ case KUDU: return "KUDU";
default:
throw new RuntimeException("Unknown HdfsFormat: "
+ this + " - should never happen!");
@@ -230,6 +241,8 @@ public enum HdfsFileFormat {
case AVRO:
case PARQUET:
return true;
+ case KUDU:
+ return false;
default:
throw new RuntimeException("Unknown HdfsFormat: "
+ this + " - should never happen!");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index c416bee..3647256 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -87,12 +87,17 @@ public class ImpaladCatalog extends Catalog {
// Object that is used to synchronize on and signal when a catalog update is received.
private final Object catalogUpdateEventNotifier_ = new Object();
+ // The addresses of the Kudu masters to use if no Kudu masters were explicitly provided.
+ // Used during table creation.
+ private final String defaultKuduMasterHosts_;
+
/**
* C'tor used by tests that need to validate the ImpaladCatalog outside of the
* CatalogServer.
*/
- public ImpaladCatalog() {
+ public ImpaladCatalog(String defaultKuduMasterHosts) {
super(false);
+ defaultKuduMasterHosts_ = defaultKuduMasterHosts;
}
/**
@@ -445,4 +450,5 @@ public class ImpaladCatalog extends Catalog {
// Only used for testing.
public void setIsReady(boolean isReady) { isReady_.set(isReady); }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
+ public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; }
}