You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/22 05:33:34 UTC

[07/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 3acb1a3..73173cb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -17,23 +17,12 @@
 
 package org.apache.impala.analysis;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
-import org.apache.hadoop.fs.permission.FsAction;
-
-import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.FileSystemUtil;
-import org.apache.impala.thrift.TAccessEvent;
-import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TTableName;
@@ -41,113 +30,80 @@ import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.KuduUtil;
-import org.apache.impala.util.MetaStoreUtil;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
 
 /**
  * Represents a CREATE TABLE statement.
  */
 public class CreateTableStmt extends StatementBase {
-  private List<ColumnDef> columnDefs_;
-  private final String comment_;
-  private final boolean isExternal_;
-  private final boolean ifNotExists_;
-  private final THdfsFileFormat fileFormat_;
-  private final ArrayList<ColumnDef> partitionColDefs_;
-  private final RowFormat rowFormat_;
-  private TableName tableName_;
-  private final Map<String, String> tblProperties_;
-  private final Map<String, String> serdeProperties_;
-  private final HdfsCachingOp cachingOp_;
-  private HdfsUri location_;
-  private final List<DistributeParam> distributeParams_;
-
-  // Set during analysis
+
+  @VisibleForTesting
+  final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be"
+      + " specified using 'STORED AS KUDU' without using the storage handler table"
+      + " property.";
+
+  // Table parameters specified in a CREATE TABLE statement
+  private final TableDef tableDef_;
+
+  // Table owner. Set during analysis
   private String owner_;
 
-  /**
-   * Builds a CREATE TABLE statement
-   * @param tableName - Name of the new table
-   * @param columnDefs - List of column definitions for the table
-   * @param partitionColumnDefs - List of partition column definitions for the table
-   * @param isExternal - If true, the table's data will be preserved if dropped.
-   * @param comment - Comment to attach to the table
-   * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT
-   *          to specify default row format.
-   * @param fileFormat - File format of the table
-   * @param location - The HDFS location of where the table data will stored.
-   * @param cachingOp - The HDFS caching op that should be applied to this table.
-   * @param ifNotExists - If true, no errors are thrown if the table already exists.
-   * @param tblProperties - Optional map of key/values to persist with table metadata.
-   * @param serdeProperties - Optional map of key/values to persist with table serde
-   *                          metadata.
-   */
-  public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs,
-      List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment,
-      RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location,
-      HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties,
-      Map<String, String> serdeProperties, List<DistributeParam> distributeParams) {
-    Preconditions.checkNotNull(columnDefs);
-    Preconditions.checkNotNull(partitionColumnDefs);
-    Preconditions.checkNotNull(fileFormat);
-    Preconditions.checkNotNull(rowFormat);
-    Preconditions.checkNotNull(tableName);
-
-    columnDefs_ = Lists.newArrayList(columnDefs);
-    comment_ = comment;
-    isExternal_ = isExternal;
-    ifNotExists_ = ifNotExists;
-    fileFormat_ = fileFormat;
-    location_ = location;
-    cachingOp_ = cachingOp;
-    partitionColDefs_ = Lists.newArrayList(partitionColumnDefs);
-    rowFormat_ = rowFormat;
-    tableName_ = tableName;
-    tblProperties_ = tblProperties;
-    serdeProperties_ = serdeProperties;
-    unescapeProperties(tblProperties_);
-    unescapeProperties(serdeProperties_);
-    distributeParams_ = distributeParams;
+  public CreateTableStmt(TableDef tableDef) {
+    Preconditions.checkNotNull(tableDef);
+    tableDef_ = tableDef;
   }
 
   /**
    * Copy c'tor.
    */
-  public CreateTableStmt(CreateTableStmt other) {
-    columnDefs_ = Lists.newArrayList(other.columnDefs_);
-    comment_ = other.comment_;
-    isExternal_ = other.isExternal_;
-    ifNotExists_ = other.ifNotExists_;
-    fileFormat_ = other.fileFormat_;
-    location_ = other.location_;
-    cachingOp_ = other.cachingOp_;
-    partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_);
-    rowFormat_ = other.rowFormat_;
-    tableName_ = other.tableName_;
-    tblProperties_ = other.tblProperties_;
-    serdeProperties_ = other.serdeProperties_;
-    distributeParams_ = other.distributeParams_;
+  CreateTableStmt(CreateTableStmt other) {
+    this(other.tableDef_);
+    owner_ = other.owner_;
   }
 
   @Override
   public CreateTableStmt clone() { return new CreateTableStmt(this); }
 
-  public String getTbl() { return tableName_.getTbl(); }
-  public TableName getTblName() { return tableName_; }
-  public List<ColumnDef> getColumnDefs() { return columnDefs_; }
-  public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
-  public String getComment() { return comment_; }
-  public boolean isExternal() { return isExternal_; }
-  public boolean getIfNotExists() { return ifNotExists_; }
-  public HdfsUri getLocation() { return location_; }
-  public void setLocation(HdfsUri location) { this.location_ = location; }
-  public THdfsFileFormat getFileFormat() { return fileFormat_; }
-  public RowFormat getRowFormat() { return rowFormat_; }
-  public Map<String, String> getTblProperties() { return tblProperties_; }
-  public Map<String, String> getSerdeProperties() { return serdeProperties_; }
+  public String getTbl() { return getTblName().getTbl(); }
+  public TableName getTblName() { return tableDef_.getTblName(); }
+  public boolean getIfNotExists() { return tableDef_.getIfNotExists(); }
+  public List<ColumnDef> getColumnDefs() { return tableDef_.getColumnDefs(); }
+  private void setColumnDefs(List<ColumnDef> colDefs) {
+    getColumnDefs().clear();
+    getColumnDefs().addAll(colDefs);
+  }
+  private List<ColumnDef> getPrimaryKeyColumnDefs() {
+    return tableDef_.getPrimaryKeyColumnDefs();
+  }
+  public boolean isExternal() { return tableDef_.isExternal(); }
+  public List<ColumnDef> getPartitionColumnDefs() {
+    return tableDef_.getPartitionColumnDefs();
+  }
+  public List<DistributeParam> getDistributeParams() {
+    return tableDef_.getDistributeParams();
+  }
+  public String getComment() { return tableDef_.getComment(); }
+  Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); }
+  private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); }
+  public HdfsUri getLocation() { return tableDef_.getLocation(); }
+  Map<String, String> getSerdeProperties() { return tableDef_.getSerdeProperties(); }
+  public THdfsFileFormat getFileFormat() { return tableDef_.getFileFormat(); }
+  RowFormat getRowFormat() { return tableDef_.getRowFormat(); }
+
+  // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user
+  // at the table level. Note that primary keys may also be declared in column
+  // definitions, those are not included here (they are stored in the ColumnDefs).
+  List<String> getTblPrimaryKeyColumnNames() {
+    return tableDef_.getPrimaryKeyColumnNames();
+  }
 
   /**
    * Can only be called after analysis, returns the owner of this table (the user from
@@ -164,7 +120,7 @@ public class CreateTableStmt extends StatementBase {
    */
   public String getDb() {
     Preconditions.checkState(isAnalyzed());
-    return tableName_.getDb();
+    return getTblName().getDb();
   }
 
   @Override
@@ -173,240 +129,246 @@ public class CreateTableStmt extends StatementBase {
   public TCreateTableParams toThrift() {
     TCreateTableParams params = new TCreateTableParams();
     params.setTable_name(new TTableName(getDb(), getTbl()));
-    for (ColumnDef col: getColumnDefs()) {
-      params.addToColumns(col.toThrift());
-    }
+    List<org.apache.impala.thrift.TColumn> tColumns = Lists.newArrayList();
+    for (ColumnDef col: getColumnDefs()) tColumns.add(col.toThrift());
+    params.setColumns(tColumns);
     for (ColumnDef col: getPartitionColumnDefs()) {
       params.addToPartition_columns(col.toThrift());
     }
     params.setOwner(getOwner());
     params.setIs_external(isExternal());
-    params.setComment(comment_);
-    params.setLocation(location_ == null ? null : location_.toString());
-    if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift());
-    params.setRow_format(rowFormat_.toThrift());
-    params.setFile_format(fileFormat_);
+    params.setComment(getComment());
+    params.setLocation(getLocation() == null ? null : getLocation().toString());
+    if (getCachingOp() != null) params.setCache_op(getCachingOp().toThrift());
+    if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift());
+    params.setFile_format(getFileFormat());
     params.setIf_not_exists(getIfNotExists());
-    if (tblProperties_ != null) params.setTable_properties(tblProperties_);
-    if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_);
-    if (distributeParams_ != null) {
-      for (DistributeParam d : distributeParams_) {
-        params.addToDistribute_by(d.toThrift());
-      }
+    params.setTable_properties(getTblProperties());
+    params.setSerde_properties(getSerdeProperties());
+    for (DistributeParam d: getDistributeParams()) {
+      params.addToDistribute_by(d.toThrift());
+    }
+    for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) {
+      params.addToPrimary_key_column_names(pkColDef.getColName());
     }
+
     return params;
   }
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
-    Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
-    tableName_ = analyzer.getFqTableName(tableName_);
-    tableName_.analyze();
     owner_ = analyzer.getUser().getName();
-
-    MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_);
-    MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_);
-
-    if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(),
-        Privilege.CREATE) && !ifNotExists_) {
-      throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_);
-    }
-
-    analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(),
-        TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
-
-    // Only Avro tables can have empty column defs because they can infer them from
-    // the Avro schema.
-    if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) {
+    tableDef_.analyze(analyzer);
+    analyzeKuduFormat(analyzer);
+    // Avro tables can have empty column defs because they can infer them from the Avro
+    // schema. Likewise for external Kudu tables, the schema can be read from Kudu.
+    if (getColumnDefs().isEmpty() && getFileFormat() != THdfsFileFormat.AVRO
+        && getFileFormat() != THdfsFileFormat.KUDU) {
       throw new AnalysisException("Table requires at least 1 column");
     }
-
-    if (location_ != null) {
-      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
-    }
-
-    analyzeRowFormat(analyzer);
-
-    // Check that all the column names are valid and unique.
-    analyzeColumnDefs(analyzer);
-
-    if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals(
-        getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
-      analyzeKuduTable(analyzer);
-    } else if (distributeParams_ != null) {
-      throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause.");
-    }
-
-    if (fileFormat_ == THdfsFileFormat.AVRO) {
-      columnDefs_ = analyzeAvroSchema(analyzer);
-      if (columnDefs_.isEmpty()) {
+    if (getFileFormat() == THdfsFileFormat.AVRO) {
+      setColumnDefs(analyzeAvroSchema(analyzer));
+      if (getColumnDefs().isEmpty()) {
         throw new AnalysisException(
             "An Avro table requires column definitions or an Avro schema.");
       }
-      AvroSchemaUtils.setFromSerdeComment(columnDefs_);
-      analyzeColumnDefs(analyzer);
+      AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
+  }
 
-    if (cachingOp_ != null) {
-      cachingOp_.analyze(analyzer);
-      if (cachingOp_.shouldCache() && location_ != null &&
-          !FileSystemUtil.isPathCacheable(location_.getPath())) {
-        throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
-            "Please retry without caching: CREATE TABLE %s ... UNCACHED",
-            location_.toString(), tableName_));
+  /**
+   * Analyzes the parameters of a CREATE TABLE ... STORED AS KUDU statement. Also checks
+   * if Kudu specific properties and parameters are specified for non-Kudu tables.
+   */
+  private void analyzeKuduFormat(Analyzer analyzer) throws AnalysisException {
+    if (getFileFormat() != THdfsFileFormat.KUDU) {
+      if (KuduTable.KUDU_STORAGE_HANDLER.equals(
+          getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
+        throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+      }
+      AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
+          "Only Kudu tables can use the DISTRIBUTE BY clause.");
+      if (hasPrimaryKey()) {
+        throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY.");
       }
+      return;
     }
 
-    // Analyze 'skip.header.line.format' property.
-    if (tblProperties_ != null) {
-      AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_);
+    analyzeKuduTableProperties(analyzer);
+    if (isExternal()) {
+      analyzeExternalKuduTableParams();
+    } else {
+      analyzeManagedKuduTableParams(analyzer);
     }
   }
 
-  private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
-    Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter());
-    Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter());
-    Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar());
-    if (fileFormat_ == THdfsFileFormat.TEXT) {
-      if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
-      if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
-      if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
-      if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) {
-        throw new AnalysisException("Field delimiter and line delimiter have same " +
-            "value: byte " + fieldDelim);
-      }
-      if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) {
-        analyzer.addWarning("Field delimiter and escape character have same value: " +
-            "byte " + fieldDelim + ". Escape character will be ignored");
-      }
-      if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) {
-        analyzer.addWarning("Line delimiter and escape character have same value: " +
-            "byte " + lineDelim + ". Escape character will be ignored");
+  /**
+   * Analyzes and checks table properties which are common to both managed and external
+   * Kudu tables.
+   */
+  private void analyzeKuduTableProperties(Analyzer analyzer) throws AnalysisException {
+    if (getTblProperties().containsKey(KuduTable.KEY_STORAGE_HANDLER)) {
+      throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    }
+    getTblProperties().put(KuduTable.KEY_STORAGE_HANDLER, KuduTable.KUDU_STORAGE_HANDLER);
+
+    String masterHosts = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS);
+    if (Strings.isNullOrEmpty(masterHosts)) {
+      masterHosts = analyzer.getCatalog().getDefaultKuduMasterHosts();
+      if (masterHosts.isEmpty()) {
+        throw new AnalysisException(String.format(
+            "Table property '%s' is required when the impalad startup flag " +
+            "-kudu_master_hosts is not used.", KuduTable.KEY_MASTER_HOSTS));
       }
+      getTblProperties().put(KuduTable.KEY_MASTER_HOSTS, masterHosts);
     }
+
+    // TODO: Find out what is creating a directory in HDFS and stop doing that. Kudu
+    //       tables shouldn't have HDFS dirs.
+    //       https://issues.cloudera.org/browse/IMPALA-3570
+    AnalysisUtils.throwIfNotNull(getCachingOp(),
+        "A Kudu table cannot be cached in HDFS.");
+    AnalysisUtils.throwIfNotNull(getLocation(), "LOCATION cannot be specified for a " +
+        "Kudu table.");
+    AnalysisUtils.throwIfNotEmpty(tableDef_.getPartitionColumnDefs(),
+        "PARTITIONED BY cannot be used in Kudu tables.");
   }
 
   /**
-   * Analyzes columnDefs_ and partitionColDefs_ checking whether all column
-   * names are unique.
+   * Analyzes and checks parameters specified for external Kudu tables.
    */
-  private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
-    Set<String> colNames = Sets.newHashSet();
-    for (ColumnDef colDef: columnDefs_) {
-      colDef.analyze();
-      if (!colNames.add(colDef.getColName().toLowerCase())) {
-        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
-      }
+  private void analyzeExternalKuduTableParams() throws AnalysisException {
+    AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
+        String.format("Table property %s must be specified when creating " +
+            "an external Kudu table.", KuduTable.KEY_TABLE_NAME));
+    if (hasPrimaryKey()
+        || getTblProperties().containsKey(KuduTable.KEY_KEY_COLUMNS)) {
+      throw new AnalysisException("Primary keys cannot be specified for an external " +
+          "Kudu table");
     }
-    for (ColumnDef colDef: partitionColDefs_) {
-      colDef.analyze();
-      if (!colDef.getType().supportsTablePartitioning()) {
-        throw new AnalysisException(
-            String.format("Type '%s' is not supported as partition-column type " +
-                "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+    AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS),
+        String.format("Table property '%s' cannot be used with an external Kudu table.",
+            KuduTable.KEY_TABLET_REPLICAS));
+    AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
+        "Columns cannot be specified with an external Kudu table.");
+    AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
+        "DISTRIBUTE BY cannot be used with an external Kudu table.");
+  }
+
+  /**
+   * Analyzes and checks parameters specified for managed Kudu tables.
+   */
+  private void analyzeManagedKuduTableParams(Analyzer analyzer) throws AnalysisException {
+    // If no Kudu table name is specified in tblproperties, generate one using the
+    // current database as a prefix to avoid conflicts in Kudu.
+    if (!getTblProperties().containsKey(KuduTable.KEY_TABLE_NAME)) {
+      getTblProperties().put(KuduTable.KEY_TABLE_NAME,
+          KuduUtil.getDefaultCreateKuduTableName(getDb(), getTbl()));
+    }
+    AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),
+        String.format("PRIMARY KEY must be used instead of the table property '%s'.",
+            KuduTable.KEY_KEY_COLUMNS));
+    if (!hasPrimaryKey()) {
+      throw new AnalysisException("A primary key is required for a Kudu table.");
+    }
+    String tabletReplicas = getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS);
+    if (tabletReplicas != null) {
+      Integer r = Ints.tryParse(tabletReplicas);
+      if (r == null) {
+        throw new AnalysisException(String.format(
+            "Table property '%s' must be an integer.", KuduTable.KEY_TABLET_REPLICAS));
+      }
+      if (r <= 0) {
+        throw new AnalysisException("Number of tablet replicas must be greater than " +
+            "zero. Given number of replicas is: " + r.toString());
       }
-      if (!colNames.add(colDef.getColName().toLowerCase())) {
-        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+    }
+
+    if (!getDistributeParams().isEmpty()) {
+      analyzeDistributeParams(analyzer);
+    } else {
+      throw new AnalysisException("Table distribution must be specified for " +
+          "managed Kudu tables.");
+    }
+  }
+
+  /**
+   * Analyzes the distribution schemes specified in the CREATE TABLE statement.
+   */
+  private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
+    Map<String, ColumnDef> pkColDefsByName =
+        ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
+    for (DistributeParam distributeParam: getDistributeParams()) {
+      // If no column names were specified in this distribution scheme, use all the
+      // primary key columns.
+      if (!distributeParam.hasColumnNames()) {
+        distributeParam.setColumnNames(pkColDefsByName.keySet());
       }
+      distributeParam.setPkColumnDefMap(pkColDefsByName);
+      distributeParam.analyze(analyzer);
     }
   }
 
   /**
-   * Analyzes the Avro schema and compares it with the columnDefs_ to detect
+   * Checks if a primary key is specified in a CREATE TABLE stmt. Should only be called
+   * after tableDef_ has been analyzed.
+   */
+  private boolean hasPrimaryKey() {
+    Preconditions.checkState(tableDef_.isAnalyzed());
+    return !tableDef_.getPrimaryKeyColumnDefs().isEmpty();
+  }
+
+  /**
+   * Analyzes the Avro schema and compares it with the getColumnDefs() to detect
    * inconsistencies. Returns a list of column descriptors that should be
-   * used for creating the table (possibly identical to columnDefs_).
+   * used for creating the table (possibly identical to getColumnDefs()).
    */
-  private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer)
-      throws AnalysisException {
-    Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO);
+  private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(getFileFormat() == THdfsFileFormat.AVRO);
     // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter
     // taking precedence.
     List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
-    schemaSearchLocations.add(serdeProperties_);
-    schemaSearchLocations.add(tblProperties_);
-    String avroSchema = null;
-    List<ColumnDef> avroCols = null; // parsed from avroSchema
+    schemaSearchLocations.add(getSerdeProperties());
+    schemaSearchLocations.add(getTblProperties());
+    String avroSchema;
+    List<ColumnDef> avroCols; // parsed from avroSchema
     try {
       avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
       if (avroSchema == null) {
         // No Avro schema was explicitly set in the serde or table properties, so infer
         // the Avro schema from the column definitions.
         Schema inferredSchema = AvroSchemaConverter.convertColumnDefs(
-            columnDefs_, tableName_.toString());
+            getColumnDefs(), getTblName().toString());
         avroSchema = inferredSchema.toString();
       }
       if (Strings.isNullOrEmpty(avroSchema)) {
         throw new AnalysisException("Avro schema is null or empty: " +
-            tableName_.toString());
+            getTblName().toString());
       }
       avroCols = AvroSchemaParser.parse(avroSchema);
     } catch (SchemaParseException e) {
       throw new AnalysisException(String.format(
-          "Error parsing Avro schema for table '%s': %s", tableName_.toString(),
+          "Error parsing Avro schema for table '%s': %s", getTblName().toString(),
           e.getMessage()));
     }
     Preconditions.checkNotNull(avroCols);
 
-    // Analyze the Avro schema to detect inconsistencies with the columnDefs_.
+    // Analyze the Avro schema to detect inconsistencies with the getColumnDefs().
     // In case of inconsistencies, the column defs are ignored in favor of the Avro
     // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104).
     StringBuilder warning = new StringBuilder();
     List<ColumnDef> reconciledColDefs =
-        AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning);
+        AvroSchemaUtils.reconcileSchemas(getColumnDefs(), avroCols, warning);
     if (warning.length() > 0) analyzer.addWarning(warning.toString());
     return reconciledColDefs;
   }
 
-  private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
-    // Validate that Kudu table is correctly specified.
-    if (!KuduTable.tableParamsAreValid(getTblProperties())) {
-      throw new AnalysisException("Kudu table is missing parameters " +
-          String.format("in table properties. Please verify if %s, %s, and %s are "
-                  + "present and have valid values.",
-              KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES,
-              KuduTable.KEY_KEY_COLUMNS));
-    }
-
-    // Kudu table cannot be a cached table
-    if (cachingOp_ != null) {
-      throw new AnalysisException("A Kudu table cannot be cached in HDFS.");
-    }
-
-    if (distributeParams_ != null) {
-      if (isExternal_) {
-        throw new AnalysisException(
-            "The DISTRIBUTE BY clause may not be specified for external tables.");
-      }
-
-      List<String> keyColumns = KuduUtil.parseKeyColumnsAsList(
-          getTblProperties().get(KuduTable.KEY_KEY_COLUMNS));
-      for (DistributeParam d : distributeParams_) {
-        // If the columns are not set, default to all key columns
-        if (d.getColumns() == null) d.setColumns(keyColumns);
-        d.analyze(analyzer);
-      }
-    } else if (!isExternal_) {
-      throw new AnalysisException(
-          "A data distribution must be specified using the DISTRIBUTE BY clause.");
-    }
-  }
-
-  private Byte analyzeRowFormatValue(String value) throws AnalysisException {
-    if (value == null) return null;
-    Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
-    if (byteVal == null) {
-      throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
-          "terminators must be specified as a single character or as a decimal " +
-          "value in the range [-128:127]: " + value);
-    }
-    return byteVal;
-  }
-
   /**
    * Unescapes all values in the property map.
    */
-  public static void unescapeProperties(Map<String, String> propertyMap) {
+  static void unescapeProperties(Map<String, String> propertyMap) {
     if (propertyMap == null) return;
     for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
       propertyMap.put(kv.getKey(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
index 319fe50..34bed86 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
@@ -17,19 +17,20 @@
 
 package org.apache.impala.analysis;
 
-import java.math.BigDecimal;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TDistributeByHashParam;
 import org.apache.impala.thrift.TDistributeByRangeParam;
 import org.apache.impala.thrift.TDistributeParam;
-import org.apache.impala.thrift.TDistributeType;
 import org.apache.impala.thrift.TRangeLiteral;
 import org.apache.impala.thrift.TRangeLiteralList;
+import org.apache.impala.util.KuduUtil;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
 /**
@@ -50,16 +51,16 @@ public class DistributeParam implements ParseNode {
   /**
    * Creates a DistributeParam partitioned by hash.
    */
-  public static DistributeParam createHashParam(List<String> cols, BigDecimal buckets) {
-    return new DistributeParam(Type.HASH, cols, buckets);
+  public static DistributeParam createHashParam(List<String> cols, int buckets) {
+    return new DistributeParam(Type.HASH, cols, buckets, null);
   }
 
   /**
    * Creates a DistributeParam partitioned by range.
    */
   public static DistributeParam createRangeParam(List<String> cols,
-      ArrayList<ArrayList<LiteralExpr>> splitRows) {
-    return new DistributeParam(Type.RANGE, cols, splitRows);
+      List<List<LiteralExpr>> splitRows) {
+    return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, splitRows);
   }
 
   private static final int NO_BUCKETS = -1;
@@ -69,131 +70,159 @@ public class DistributeParam implements ParseNode {
    */
   public enum Type {
     HASH, RANGE
-  };
+  }
+
+  // May be empty indicating that all keys in the table should be used.
+  private final List<String> colNames_ = Lists.newArrayList();
 
-  private List<String> columns_;
+  // Map of primary key column names to the associated column definitions. Must be set
+  // before the call to analyze().
+  private Map<String, ColumnDef> pkColumnDefByName_;
 
+  // Distribution type
   private final Type type_;
 
   // Only relevant for hash partitioning, -1 otherwise
-  private final int num_buckets_;
+  private final int numBuckets_;
 
   // Only relevant for range partitioning, null otherwise
-  private final ArrayList<ArrayList<LiteralExpr>> splitRows_;
-
-  // Set in analyze()
-  private TDistributeByRangeParam rangeParam_;
-
-  private DistributeParam(Type t, List<String> cols, BigDecimal buckets) {
-    type_ = t;
-    columns_ = cols;
-    num_buckets_ = buckets.intValue();
-    splitRows_ = null;
-  }
+  private final List<List<LiteralExpr>> splitRows_;
 
-  private DistributeParam(Type t, List<String> cols,
-      ArrayList<ArrayList<LiteralExpr>> splitRows) {
+  private DistributeParam(Type t, List<String> colNames, int buckets,
+      List<List<LiteralExpr>> splitRows) {
     type_ = t;
-    columns_ = cols;
+    for (String name: colNames) colNames_.add(name.toLowerCase());
+    numBuckets_ = buckets;
     splitRows_ = splitRows;
-    num_buckets_ = NO_BUCKETS;
   }
 
-  /**
-   * TODO Refactor the logic below to analyze 'columns_'. This analysis should output
-   * a vector of column types that would then be used during the analysis of the split
-   * rows.
-   */
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
-    if (type_ == Type.HASH && num_buckets_ <= 1) {
-      throw new AnalysisException(String.format(
-          "Number of buckets in DISTRIBUTE BY clause '%s' must be larger than 1.",
-          toSql()));
-    } else if (type_ == Type.RANGE) {
-      // Creating the thrift structure simultaneously checks for semantic errors
-      rangeParam_ = new TDistributeByRangeParam();
-      rangeParam_.setColumns(columns_);
-
-      for (ArrayList<LiteralExpr> splitRow : splitRows_) {
-        TRangeLiteralList list = new TRangeLiteralList();
-        if (splitRow.size() != columns_.size()) {
+    Preconditions.checkState(!colNames_.isEmpty());
+    Preconditions.checkNotNull(pkColumnDefByName_);
+    Preconditions.checkState(!pkColumnDefByName_.isEmpty());
+    // Validate the columns specified in the DISTRIBUTE BY clause
+    for (String colName: colNames_) {
+      if (!pkColumnDefByName_.containsKey(colName)) {
+        throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
+            "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql()));
+      }
+    }
+
+    if (type_ == Type.RANGE) {
+      for (List<LiteralExpr> splitRow : splitRows_) {
+        if (splitRow.size() != colNames_.size()) {
           throw new AnalysisException(String.format(
               "SPLIT ROWS has different size than number of projected key columns: %d. "
-                  + "Split row: %s", columns_.size(), splitRowToString(splitRow)));
+                  + "Split row: %s", colNames_.size(), splitRowToString(splitRow)));
         }
-        for (LiteralExpr expr : splitRow) {
+        for (int i = 0; i < splitRow.size(); ++i) {
+          LiteralExpr expr = splitRow.get(i);
+          ColumnDef colDef = pkColumnDefByName_.get(colNames_.get(i));
+          org.apache.impala.catalog.Type colType = colDef.getType();
+          Preconditions.checkState(KuduUtil.isSupportedKeyType(colType));
           expr.analyze(analyzer);
-          TRangeLiteral literal = new TRangeLiteral();
-          if (expr instanceof NumericLiteral) {
-            NumericLiteral num = (NumericLiteral) expr;
-            if (num.getType().isDecimal() || num.getType().isFloatingPointType()) {
-              throw new AnalysisException("Only integral and string values allowed for" +
-                  " split rows.");
-            } else {
-              literal.setInt_literal(num.getIntValue());
-            }
-          } else if (expr instanceof StringLiteral) {
-            StringLiteral string = (StringLiteral) expr;
-            literal.setString_literal(string.getStringValue());
-          } else if (expr instanceof BoolLiteral) {
-            BoolLiteral bool = (BoolLiteral) expr;
-            literal.setBool_literal(bool.getValue());
-          } else {
-            throw new AnalysisException(String.format("Split row value is not supported: "
-                + "%s (Type: %s).", expr.getStringValue(), expr.getType().toSql()));
+          org.apache.impala.catalog.Type exprType = expr.getType();
+          if (exprType.isNull()) {
+            throw new AnalysisException("Split values cannot be NULL. Split row: " +
+                splitRowToString(splitRow));
+          }
+          if (!org.apache.impala.catalog.Type.isImplicitlyCastable(exprType, colType,
+              true)) {
+            throw new AnalysisException(String.format("Split value %s (type: %s) is " +
+                "not type compatible with column '%s' (type: %s).", expr.toSql(),
+                exprType, colDef.getColName(), colType.toSql()));
           }
-          list.addToValues(literal);
         }
-        rangeParam_.addToSplit_rows(list);
       }
     }
   }
 
   @Override
   public String toSql() {
-    if (num_buckets_ == NO_BUCKETS) {
-      List<String> splitRowStrings = Lists.newArrayList();
-      for (ArrayList<LiteralExpr> splitRow : splitRows_) {
-        splitRowStrings.add(splitRowToString(splitRow));
-      }
-      return String.format("RANGE(%s) INTO RANGES(%s)", Joiner.on(", ").join(columns_),
-          Joiner.on(", ").join(splitRowStrings));
+    StringBuilder builder = new StringBuilder(type_.toString());
+    if (!colNames_.isEmpty()) {
+      builder.append(" (");
+      Joiner.on(", ").appendTo(builder, colNames_).append(")");
+    }
+    if (type_ == Type.HASH) {
+      builder.append(" INTO ");
+      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+      builder.append(numBuckets_).append(" BUCKETS");
     } else {
-      return String.format("HASH(%s) INTO %d BUCKETS", Joiner.on(", ").join(columns_),
-          num_buckets_);
+      builder.append(" SPLIT ROWS (");
+      if (splitRows_ == null) {
+        builder.append("...");
+      } else {
+        for (List<LiteralExpr> splitRow: splitRows_) {
+          builder.append(splitRowToString(splitRow));
+        }
+      }
+      builder.append(")");
     }
+    return builder.toString();
   }
 
-  private String splitRowToString(ArrayList<LiteralExpr> splitRow) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("(");
-    List<String> rangeElementStrings = Lists.newArrayList();
-    for (LiteralExpr rangeElement : splitRow) {
-      rangeElementStrings.add(rangeElement.toSql());
+  @Override
+  public String toString() { return toSql(); }
+
+  private String splitRowToString(List<LiteralExpr> splitRow) {
+    StringBuilder builder = new StringBuilder("(");
+    for (LiteralExpr expr: splitRow) {
+      if (builder.length() > 1) builder.append(", ");
+      builder.append(expr.toSql());
     }
-    builder.append(Joiner.on(", ").join(rangeElementStrings));
-    builder.append(")");
-    return builder.toString();
+    return builder.append(")").toString();
   }
 
-  TDistributeParam toThrift() {
+  public TDistributeParam toThrift() {
     TDistributeParam result = new TDistributeParam();
+    // TODO: Add a validate() function to ensure the validity of distribute params.
     if (type_ == Type.HASH) {
       TDistributeByHashParam hash = new TDistributeByHashParam();
-      hash.setNum_buckets(num_buckets_);
-      hash.setColumns(columns_);
+      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+      hash.setNum_buckets(numBuckets_);
+      hash.setColumns(colNames_);
       result.setBy_hash_param(hash);
     } else {
       Preconditions.checkState(type_ == Type.RANGE);
-
-      result.setBy_range_param(rangeParam_);
+      TDistributeByRangeParam rangeParam = new TDistributeByRangeParam();
+      rangeParam.setColumns(colNames_);
+      if (splitRows_ == null) {
+        result.setBy_range_param(rangeParam);
+        return result;
+      }
+      for (List<LiteralExpr> splitRow : splitRows_) {
+        TRangeLiteralList list = new TRangeLiteralList();
+        for (int i = 0; i < splitRow.size(); ++i) {
+          LiteralExpr expr = splitRow.get(i);
+          TRangeLiteral literal = new TRangeLiteral();
+          if (expr instanceof NumericLiteral) {
+            literal.setInt_literal(((NumericLiteral)expr).getIntValue());
+          } else {
+            String exprValue = expr.getStringValue();
+            Preconditions.checkState(!Strings.isNullOrEmpty(exprValue));
+            literal.setString_literal(exprValue);
+          }
+          list.addToValues(literal);
+        }
+        rangeParam.addToSplit_rows(list);
+      }
+      result.setBy_range_param(rangeParam);
     }
     return result;
   }
 
-  public List<String> getColumns() { return columns_; }
-  public void setColumns(List<String> cols) { columns_ = cols; }
-  public Type getType_() { return type_; }
-  public int getNumBuckets() { return num_buckets_; }
+  void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
+    pkColumnDefByName_ = pkColumnDefByName;
+  }
+
+  boolean hasColumnNames() { return !colNames_.isEmpty(); }
+
+  void setColumnNames(Collection<String> colNames) {
+    Preconditions.checkState(colNames_.isEmpty());
+    colNames_.addAll(colNames);
+  }
+
+  public Type getType() { return type_; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 24b8417..28de1a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -190,7 +190,7 @@ public abstract class ModifyStmt extends StatementBase {
 
     // cast result expressions to the correct type of the referenced slot of the
     // target table
-    int keyColumnsOffset = table_.getKuduKeyColumnNames().size();
+    int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size();
     for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
       sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
           assignments_.get(i - keyColumnsOffset).first.getType()));
@@ -225,7 +225,7 @@ public abstract class ModifyStmt extends StatementBase {
     }
 
     // Add the key columns as slot refs
-    for (String k : table_.getKuduKeyColumnNames()) {
+    for (String k : table_.getPrimaryKeyColumnNames()) {
       ArrayList<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), k);
       SlotRef ref = new SlotRef(path);
       ref.analyze(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
new file mode 100644
index 0000000..4d3ed80
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement.
+ * TODO: Reconsider this class when we add support for new range partitioning syntax (see
+ * IMPALA-3724).
+ */
+class TableDataLayout {
+
+  private final List<ColumnDef> partitionColDefs_;
+  private final List<DistributeParam> distributeParams_;
+
+  private TableDataLayout(List<ColumnDef> partitionColumnDefs,
+      List<DistributeParam> distributeParams) {
+    partitionColDefs_ = partitionColumnDefs;
+    distributeParams_ = distributeParams;
+  }
+
+  static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) {
+    return new TableDataLayout(partitionColumnDefs,
+        Lists.<DistributeParam>newArrayList());
+  }
+
+  static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) {
+    return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams);
+  }
+
+  static TableDataLayout createEmptyLayout() {
+    return new TableDataLayout(Lists.<ColumnDef>newArrayList(),
+        Lists.<DistributeParam>newArrayList());
+  }
+
+  List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
+  List<DistributeParam> getDistributeParams() { return distributeParams_; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
new file mode 100644
index 0000000..ce08e36
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.thrift.TAccessEvent;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.util.MetaStoreUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * Represents the table parameters in a CREATE TABLE statement. These parameters
+ * correspond to the following clauses in a CREATE TABLE statement:
+ * - EXTERNAL
+ * - IF NOT EXISTS
+ * - PARTITIONED BY
+ * - DISTRIBUTE BY
+ * - ROWFORMAT
+ * - FILEFORMAT
+ * - COMMENT
+ * - SERDEPROPERTIES
+ * - TBLPROPERTIES
+ * - LOCATION
+ * - CACHED IN
+ */
+class TableDef {
+
+  // Name of the new table
+  private final TableName tableName_;
+
+  // List of column definitions
+  private final List<ColumnDef> columnDefs_ = Lists.newArrayList();
+
+  // Names of primary key columns. Populated by the parser. An empty value doesn't
+  // mean no primary keys were specified as the columnDefs_ could contain primary keys.
+  private final List<String> primaryKeyColNames_ = Lists.newArrayList();
+
+  // Authoritative list of primary key column definitions populated during analysis.
+  private final List<ColumnDef> primaryKeyColDefs_ = Lists.newArrayList();
+
+  // If true, the table's data will be preserved if dropped.
+  private final boolean isExternal_;
+
+  // If true, no errors are thrown if the table already exists.
+  private final boolean ifNotExists_;
+
+  // Partitioned/distribute by parameters.
+  private final TableDataLayout dataLayout_;
+
+  // True if analyze() has been called.
+  private boolean isAnalyzed_ = false;
+
+  /**
+   * Set of table options. These options are grouped together for convenience while
+   * parsing CREATE TABLE statements. They are typically found at the end of CREATE
+   * TABLE statements.
+   */
+  static class Options {
+    // Comment to attach to the table
+    final String comment;
+
+    // Custom row format of the table. Leave null to specify default row format.
+    final RowFormat rowFormat;
+
+    // Key/values to persist with table serde metadata.
+    final Map<String, String> serdeProperties;
+
+    // File format of the table
+    final THdfsFileFormat fileFormat;
+
+    // The HDFS location of where the table data will stored.
+    final HdfsUri location;
+
+    // The HDFS caching op that should be applied to this table.
+    final HdfsCachingOp cachingOp;
+
+    // Key/values to persist with table metadata.
+    final Map<String, String> tblProperties;
+
+    Options(String comment, RowFormat rowFormat,
+        Map<String, String> serdeProperties, THdfsFileFormat fileFormat, HdfsUri location,
+        HdfsCachingOp cachingOp, Map<String, String> tblProperties) {
+      this.comment = comment;
+      this.rowFormat = rowFormat;
+      Preconditions.checkNotNull(serdeProperties);
+      this.serdeProperties = serdeProperties;
+      this.fileFormat = fileFormat == null ? THdfsFileFormat.TEXT : fileFormat;
+      this.location = location;
+      this.cachingOp = cachingOp;
+      Preconditions.checkNotNull(tblProperties);
+      this.tblProperties = tblProperties;
+    }
+
+    public Options(String comment) {
+      this(comment, RowFormat.DEFAULT_ROW_FORMAT, Maps.<String, String>newHashMap(),
+          THdfsFileFormat.TEXT, null, null, Maps.<String, String>newHashMap());
+    }
+  }
+
+  private Options options_;
+
+  // Result of analysis.
+  private TableName fqTableName_;
+
+  TableDef(TableName tableName, boolean isExternal, boolean ifNotExists) {
+    tableName_ = tableName;
+    isExternal_ = isExternal;
+    ifNotExists_ = ifNotExists;
+    dataLayout_ = TableDataLayout.createEmptyLayout();
+  }
+
+  public TableName getTblName() {
+    return fqTableName_ != null ? fqTableName_ : tableName_;
+  }
+  public String getTbl() { return tableName_.getTbl(); }
+  public boolean isAnalyzed() { return isAnalyzed_; }
+  List<ColumnDef> getColumnDefs() { return columnDefs_; }
+  List<ColumnDef> getPartitionColumnDefs() {
+    return dataLayout_.getPartitionColumnDefs();
+  }
+  List<String> getPrimaryKeyColumnNames() { return primaryKeyColNames_; }
+  List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
+  boolean isExternal() { return isExternal_; }
+  boolean getIfNotExists() { return ifNotExists_; }
+  List<DistributeParam> getDistributeParams() {
+    return dataLayout_.getDistributeParams();
+  }
+  void setOptions(Options options) {
+    Preconditions.checkNotNull(options);
+    options_ = options;
+  }
+  String getComment() { return options_.comment; }
+  Map<String, String> getTblProperties() { return options_.tblProperties; }
+  HdfsCachingOp getCachingOp() { return options_.cachingOp; }
+  HdfsUri getLocation() { return options_.location; }
+  Map<String, String> getSerdeProperties() { return options_.serdeProperties; }
+  THdfsFileFormat getFileFormat() { return options_.fileFormat; }
+  RowFormat getRowFormat() { return options_.rowFormat; }
+
+  /**
+   * Analyzes the parameters of a CREATE TABLE statement.
+   */
+  void analyze(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+    fqTableName_ = analyzer.getFqTableName(getTblName());
+    fqTableName_.analyze();
+    analyzeColumnDefs();
+    analyzePrimaryKeys();
+
+    if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
+        && !getIfNotExists()) {
+      throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + getTblName());
+    }
+
+    analyzer.addAccessEvent(new TAccessEvent(getTblName().toString(),
+        TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+    Preconditions.checkNotNull(options_);
+    analyzeOptions(analyzer);
+    isAnalyzed_ = true;
+  }
+
+  /**
+   * Analyzes table and partition column definitions, checking whether all column
+   * names are unique.
+   */
+  private void analyzeColumnDefs() throws AnalysisException {
+    Set<String> colNames = Sets.newHashSet();
+    for (ColumnDef colDef: columnDefs_) {
+      colDef.analyze();
+      if (!colNames.add(colDef.getColName().toLowerCase())) {
+        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+      }
+    }
+    for (ColumnDef colDef: getPartitionColumnDefs()) {
+      colDef.analyze();
+      if (!colDef.getType().supportsTablePartitioning()) {
+        throw new AnalysisException(
+            String.format("Type '%s' is not supported as partition-column type " +
+                "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+      }
+      if (!colNames.add(colDef.getColName().toLowerCase())) {
+        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+      }
+    }
+  }
+
+  /**
+   * Analyzes the primary key columns. Checks if the specified primary key columns exist
+   * in the table column definitions and if composite primary keys are properly defined
+   * using the PRIMARY KEY (col,..col) clause.
+   */
+  private void analyzePrimaryKeys() throws AnalysisException {
+    for (ColumnDef colDef: columnDefs_) {
+      if (colDef.isPrimaryKey()) primaryKeyColDefs_.add(colDef);
+    }
+    if (primaryKeyColDefs_.size() > 1) {
+      throw new AnalysisException("Multiple primary keys specified. " +
+          "Composite primary keys can be specified using the " +
+          "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+    }
+    if (primaryKeyColNames_.isEmpty()) return;
+    if (!primaryKeyColDefs_.isEmpty()) {
+      throw new AnalysisException("Multiple primary keys specified. " +
+          "Composite primary keys can be specified using the " +
+          "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+    }
+    Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_);
+    for (String colName: primaryKeyColNames_) {
+      colName = colName.toLowerCase();
+      ColumnDef colDef = colDefsByColName.remove(colName);
+      if (colDef == null) {
+        if (ColumnDef.toColumnNames(primaryKeyColDefs_).contains(colName)) {
+          throw new AnalysisException(String.format("Column '%s' is listed multiple " +
+              "times as a PRIMARY KEY.", colName));
+        }
+        throw new AnalysisException(String.format(
+            "PRIMARY KEY column '%s' does not exist in the table", colName));
+      }
+      primaryKeyColDefs_.add(colDef);
+    }
+  }
+
+  private void analyzeOptions(Analyzer analyzer) throws AnalysisException {
+    MetaStoreUtil.checkShortPropertyMap("Property", options_.tblProperties);
+    MetaStoreUtil.checkShortPropertyMap("Serde property", options_.serdeProperties);
+
+    if (options_.location != null) {
+      options_.location.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    }
+
+    if (options_.cachingOp != null) {
+      options_.cachingOp.analyze(analyzer);
+      if (options_.cachingOp.shouldCache() && options_.location != null &&
+          !FileSystemUtil.isPathCacheable(options_.location.getPath())) {
+        throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
+            "Please retry without caching: CREATE TABLE ... UNCACHED",
+            options_.location));
+      }
+    }
+
+    // Analyze 'skip.header.line.format' property.
+    AlterTableSetTblProperties.analyzeSkipHeaderLineCount(options_.tblProperties);
+    analyzeRowFormat(analyzer);
+  }
+
+  private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
+    if (options_.rowFormat == null) return;
+    if (options_.fileFormat == THdfsFileFormat.KUDU) {
+      throw new AnalysisException(String.format(
+          "ROW FORMAT cannot be specified for file format %s.", options_.fileFormat));
+    }
+
+    Byte fieldDelim = analyzeRowFormatValue(options_.rowFormat.getFieldDelimiter());
+    Byte lineDelim = analyzeRowFormatValue(options_.rowFormat.getLineDelimiter());
+    Byte escapeChar = analyzeRowFormatValue(options_.rowFormat.getEscapeChar());
+    if (options_.fileFormat == THdfsFileFormat.TEXT) {
+      if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
+      if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
+      if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
+      if (fieldDelim.equals(lineDelim)) {
+        throw new AnalysisException("Field delimiter and line delimiter have same " +
+            "value: byte " + fieldDelim);
+      }
+      if (fieldDelim.equals(escapeChar)) {
+        analyzer.addWarning("Field delimiter and escape character have same value: " +
+            "byte " + fieldDelim + ". Escape character will be ignored");
+      }
+      if (lineDelim.equals(escapeChar)) {
+        analyzer.addWarning("Line delimiter and escape character have same value: " +
+            "byte " + lineDelim + ". Escape character will be ignored");
+      }
+    }
+  }
+
+  private Byte analyzeRowFormatValue(String value) throws AnalysisException {
+    if (value == null) return null;
+    Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
+    if (byteVal == null) {
+      throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
+          "terminators must be specified as a single character or as a decimal " +
+          "value in the range [-128:127]: " + value);
+    }
+    return byteVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index b125987..aa24336 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -22,10 +22,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.impala.catalog.KuduTable;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.antlr.runtime.ANTLRStringStream;
 import org.antlr.runtime.Token;
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.parse.HiveLexer;
 
@@ -35,16 +41,11 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.View;
-import org.apache.impala.common.PrintUtils;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.impala.util.KuduUtil;
 
 /**
  * Contains utility methods for creating SQL strings, for example,
@@ -132,8 +133,9 @@ public class ToSqlUtils {
     }
     // TODO: Pass the correct compression, if applicable.
     return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql,
-        partitionColsSql, stmt.getTblProperties(), stmt.getSerdeProperties(),
-        stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(),
+        partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null,
+        stmt.getTblProperties(), stmt.getSerdeProperties(), stmt.isExternal(),
+        stmt.getIfNotExists(), stmt.getRowFormat(),
         HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null,
         stmt.getLocation());
   }
@@ -152,7 +154,8 @@ public class ToSqlUtils {
     }
     // TODO: Pass the correct compression, if applicable.
     String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(),
-        innerStmt.getComment(), null, partitionColsSql, innerStmt.getTblProperties(),
+        innerStmt.getComment(), null, partitionColsSql,
+        innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getTblProperties(),
         innerStmt.getSerdeProperties(), innerStmt.isExternal(),
         innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
         HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
@@ -169,6 +172,9 @@ public class ToSqlUtils {
     if (table instanceof View) return getCreateViewSql((View)table);
     org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
     HashMap<String, String> properties = Maps.newHashMap(msTable.getParameters());
+    if (properties.containsKey("transient_lastDdlTime")) {
+      properties.remove("transient_lastDdlTime");
+    }
     boolean isExternal = msTable.getTableType() != null &&
         msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());
     String comment = properties.get("comment");
@@ -194,17 +200,40 @@ public class ToSqlUtils {
     Map<String, String> serdeParameters = msTable.getSd().getSerdeInfo().getParameters();
 
     String storageHandlerClassName = table.getStorageHandlerClassName();
+    List<String> primaryKeySql = Lists.newArrayList();
+    String kuduDistributeByParams = null;
     if (table instanceof KuduTable) {
+      KuduTable kuduTable = (KuduTable) table;
       // Kudu tables don't use LOCATION syntax
       location = null;
-      format = null;
+      format = HdfsFileFormat.KUDU;
       // Kudu tables cannot use the Hive DDL syntax for the storage handler
       storageHandlerClassName = null;
+      properties.remove(KuduTable.KEY_STORAGE_HANDLER);
+      String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
+      Preconditions.checkNotNull(kuduTableName);
+      if (kuduTableName.equals(KuduUtil.getDefaultCreateKuduTableName(
+          table.getDb().getName(), table.getName()))) {
+        properties.remove(KuduTable.KEY_TABLE_NAME);
+      }
+      // Internal property, should not be exposed to the user.
+      properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
+
+      if (!isExternal) {
+        primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
+
+        List<String> paramsSql = Lists.newArrayList();
+        for (DistributeParam param: kuduTable.getDistributeBy()) {
+          paramsSql.add(param.toSql());
+        }
+        kuduDistributeByParams = Joiner.on(", ").join(paramsSql);
+      }
     }
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
     return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
-        partitionColsSql, properties, serdeParameters, isExternal, false, rowFormat,
-        format, compression, storageHandlerClassName, tableLocation);
+        partitionColsSql, primaryKeySql, kuduDistributeByParams, properties,
+        serdeParameters, isExternal, false, rowFormat, format, compression,
+        storageHandlerClassName, tableLocation);
   }
 
   /**
@@ -214,6 +243,7 @@ public class ToSqlUtils {
    */
   public static String getCreateTableSql(String dbName, String tableName,
       String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
+      List<String> primaryKeysSql, String kuduDistributeByParams,
       Map<String, String> tblProperties, Map<String, String> serdeParameters,
       boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
       HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass,
@@ -227,7 +257,11 @@ public class ToSqlUtils {
     sb.append(tableName);
     if (columnsSql != null) {
       sb.append(" (\n  ");
-      sb.append(Joiner.on(", \n  ").join(columnsSql));
+      sb.append(Joiner.on(",\n  ").join(columnsSql));
+      if (!primaryKeysSql.isEmpty()) {
+        sb.append(",\n  PRIMARY KEY (");
+        Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")");
+      }
       sb.append("\n)");
     }
     sb.append("\n");
@@ -238,6 +272,10 @@ public class ToSqlUtils {
           Joiner.on(", \n  ").join(partitionColumnsSql)));
     }
 
+    if (kuduDistributeByParams != null) {
+      sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n");
+    }
+
     if (rowFormat != null && !rowFormat.isDefault()) {
       sb.append("ROW FORMAT DELIMITED");
       if (rowFormat.getFieldDelimiter() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 41573ed..733b2f2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -519,4 +519,8 @@ public abstract class Catalog {
     }
     return result;
   }
+
+  public static boolean isDefaultDb(String dbName) {
+    return DEFAULT_DB.equals(dbName.toLowerCase());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 41c8d62..149b00b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -23,7 +23,6 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -52,7 +50,6 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.TException;
 
-import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -65,7 +62,6 @@ import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
@@ -79,7 +75,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.Files;
 
 /**
  * Specialized Catalog that implements the CatalogService specific Catalog
@@ -693,7 +688,7 @@ public class CatalogServiceCatalog extends Catalog {
    * Adds a table with the given name to the catalog and returns the new table,
    * loading the metadata if needed.
    */
-  public Table addTable(String dbName, String tblName) throws TableNotFoundException {
+  public Table addTable(String dbName, String tblName) {
     Db db = getDb(dbName);
     if (db == null) return null;
     Table incompleteTable =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index d6fb185..0ed67c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -152,6 +152,11 @@ public class Db implements CatalogObject {
     return Lists.newArrayList(tableCache_.keySet());
   }
 
+  /**
+   * Returns the tables in the cache.
+   */
+  public List<Table> getTables() { return tableCache_.getValues(); }
+
   public boolean containsTable(String tableName) {
     return tableCache_.contains(tableName.toLowerCase());
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 86a65bd..e4fce60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -31,8 +31,12 @@ import com.google.common.collect.Lists;
  * 2) the output format class
  * 3) the serialization library class
  * 4) whether scanning complex types from it is supported
+ * 5) whether the file format can skip complex columns in scans and just materialize
+ *    scalar typed columns
  *
  * Important note: Always keep consistent with the classes used in Hive.
+ * TODO: Kudu doesn't belong in this list. Either rename this enum or create a separate
+ * list of storage engines (see IMPALA-4178).
  */
 public enum HdfsFileFormat {
   RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
@@ -57,7 +61,10 @@ public enum HdfsFileFormat {
   PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
       "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
       "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
-      true, true);
+      true, true),
+  KUDU("org.apache.kudu.mapreduce.KuduTableInputFormat",
+      "org.apache.kudu.mapreduce.KuduTableOutputFormat",
+      "", false, false);
 
   private final String inputFormat_;
   private final String outputFormat_;
@@ -103,6 +110,7 @@ public enum HdfsFileFormat {
           .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET)
           .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET)
           .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET)
+          .put(KUDU.inputFormat(), KUDU)
           .build();
 
   /**
@@ -138,6 +146,7 @@ public enum HdfsFileFormat {
       case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE;
       case AVRO: return HdfsFileFormat.AVRO;
       case PARQUET: return HdfsFileFormat.PARQUET;
+      case KUDU: return HdfsFileFormat.KUDU;
       default:
         throw new RuntimeException("Unknown THdfsFileFormat: "
             + thriftFormat + " - should never happen!");
@@ -151,6 +160,7 @@ public enum HdfsFileFormat {
       case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE;
       case AVRO: return THdfsFileFormat.AVRO;
       case PARQUET: return THdfsFileFormat.PARQUET;
+      case KUDU: return THdfsFileFormat.KUDU;
       default:
         throw new RuntimeException("Unknown HdfsFormat: "
             + this + " - should never happen!");
@@ -173,6 +183,7 @@ public enum HdfsFileFormat {
       case SEQUENCE_FILE: return "SEQUENCEFILE";
       case AVRO: return "AVRO";
       case PARQUET: return "PARQUET";
+      case KUDU: return "KUDU";
       default:
         throw new RuntimeException("Unknown HdfsFormat: "
             + this + " - should never happen!");
@@ -230,6 +241,8 @@ public enum HdfsFileFormat {
       case AVRO:
       case PARQUET:
         return true;
+      case KUDU:
+        return false;
       default:
         throw new RuntimeException("Unknown HdfsFormat: "
             + this + " - should never happen!");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index c416bee..3647256 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -87,12 +87,17 @@ public class ImpaladCatalog extends Catalog {
   // Object that is used to synchronize on and signal when a catalog update is received.
   private final Object catalogUpdateEventNotifier_ = new Object();
 
+  // The addresses of the Kudu masters to use if no Kudu masters were explicitly provided.
+  // Used during table creation.
+  private final String defaultKuduMasterHosts_;
+
   /**
    * C'tor used by tests that need to validate the ImpaladCatalog outside of the
    * CatalogServer.
    */
-  public ImpaladCatalog() {
+  public ImpaladCatalog(String defaultKuduMasterHosts) {
     super(false);
+    defaultKuduMasterHosts_ = defaultKuduMasterHosts;
   }
 
   /**
@@ -445,4 +450,5 @@ public class ImpaladCatalog extends Catalog {
   // Only used for testing.
   public void setIsReady(boolean isReady) { isReady_.set(isReady); }
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
+  public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; }
 }