You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:01 UTC

[14/33] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index d55f8da..d0185b7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -19,21 +19,19 @@ package org.apache.impala.catalog;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import javax.xml.bind.DatatypeConverter;
 
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.log4j.Logger;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.LocatedTablet;
-
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.DistributeParam;
+import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TDistributeByHashParam;
+import org.apache.impala.thrift.TDistributeByRangeParam;
+import org.apache.impala.thrift.TDistributeParam;
 import org.apache.impala.thrift.TKuduTable;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
@@ -42,76 +40,86 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.TResultRowBuilder;
+import org.apache.impala.service.CatalogOpExecutor;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
+import org.apache.kudu.client.PartitionSchema.RangeSchema;
+import org.apache.kudu.client.PartitionSchema;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
 
 /**
- * Impala representation of a Kudu table.
- *
- * The Kudu-related metadata is stored in the Metastore table's table properties.
+ * Representation of a Kudu table in the catalog cache.
  */
 public class KuduTable extends Table {
-  private static final Logger LOG = Logger.getLogger(Table.class);
+
+  private static final Logger LOG = Logger.getLogger(KuduTable.class);
 
   // Alias to the string key that identifies the storage handler for Kudu tables.
   public static final String KEY_STORAGE_HANDLER =
       hive_metastoreConstants.META_TABLE_STORAGE;
 
-  // Key to access the table name from the table properties
+  // Key to access the table name from the table properties.
   public static final String KEY_TABLE_NAME = "kudu.table_name";
 
   // Key to access the columns used to build the (composite) key of the table.
-  // The order of the keys is important.
+  // Deprecated - Used only for error checking.
   public static final String KEY_KEY_COLUMNS = "kudu.key_columns";
 
-  // Key to access the master address from the table properties. Error handling for
+  // Key to access the master host from the table properties. Error handling for
   // this string is done in the KuduClient library.
-  // TODO we should have something like KuduConfig.getDefaultConfig()
-  public static final String KEY_MASTER_ADDRESSES = "kudu.master_addresses";
+  // TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility
+  // with older versions.
+  public static final String KEY_MASTER_HOSTS = "kudu.master_addresses";
 
   // Kudu specific value for the storage handler table property keyed by
   // KEY_STORAGE_HANDLER.
+  // TODO: Fix the storage handler name (see IMPALA-4271).
   public static final String KUDU_STORAGE_HANDLER =
       "com.cloudera.kudu.hive.KuduStorageHandler";
 
   // Key to specify the number of tablet replicas.
-  // TODO(KUDU): Allow modification in alter table.
   public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas";
 
   public static final long KUDU_RPC_TIMEOUT_MS = 50000;
 
-  // The name of the table in Kudu.
+  // Table name in the Kudu storage engine. It may not neccessarily be the same as the
+  // table name specified in the CREATE TABLE statement; the latter
+  // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may
+  // differ:
+  // 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::<db_name>' to
+  // avoid conficts. TODO: Remove this when Kudu supports databases.
+  // 2. The user may specify a table name using the 'kudu.table_name' table property.
   private String kuduTableName_;
 
   // Comma separated list of Kudu master hosts with optional ports.
   private String kuduMasters_;
 
-  // The set of columns that are key columns in Kudu.
-  private ImmutableList<String> kuduKeyColumnNames_;
+  // Primary key column names.
+  private final List<String> primaryKeyColumnNames_ = Lists.newArrayList();
+
+  // Distribution schemes of this Kudu table. Both range and hash-based distributions are
+  // supported.
+  private final List<DistributeParam> distributeBy_ = Lists.newArrayList();
 
   protected KuduTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable,
       Db db, String name, String owner) {
     super(id, msTable, db, name, owner);
-  }
-
-  public TKuduTable getKuduTable() {
-    TKuduTable tbl = new TKuduTable();
-    tbl.setKey_columns(Preconditions.checkNotNull(kuduKeyColumnNames_));
-    tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
-    tbl.setTable_name(kuduTableName_);
-    return tbl;
-  }
-
-  @Override
-  public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
-    TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE,
-        getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
-    desc.setKuduTable(getKuduTable());
-    return desc;
+    kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
   }
 
   @Override
@@ -126,78 +134,149 @@ public class KuduTable extends Table {
   @Override
   public ArrayList<Column> getColumnsInHiveOrder() { return getColumns(); }
 
-  public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table mstbl) {
-    return KUDU_STORAGE_HANDLER.equals(mstbl.getParameters().get(KEY_STORAGE_HANDLER));
+  public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return KUDU_STORAGE_HANDLER.equals(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
+  }
+
+  public String getKuduTableName() { return kuduTableName_; }
+  public String getKuduMasterHosts() { return kuduMasters_; }
+
+  public List<String> getPrimaryKeyColumnNames() {
+    return ImmutableList.copyOf(primaryKeyColumnNames_);
+  }
+
+  public List<DistributeParam> getDistributeBy() {
+    return ImmutableList.copyOf(distributeBy_);
   }
 
   /**
-   * Load the columns from the schema list
+   * Loads the metadata of a Kudu table.
+   *
+   * Schema and distribution schemes are loaded directly from Kudu whereas column stats
+   * are loaded from HMS. The function also updates the table schema in HMS in order to
+   * propagate alterations made to the Kudu table to HMS.
    */
-  private void loadColumns(List<FieldSchema> schema, IMetaStoreClient client,
-      Set<String> keyColumns) throws TableLoadingException {
+  @Override
+  public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+    msTable_ = msTbl;
+    // This is set to 0 for Kudu tables.
+    // TODO: Change this to reflect the number of pk columns and modify all the
+    // places (e.g. insert stmt) that currently make use of this parameter.
+    numClusteringCols_ = 0;
+    kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    Preconditions.checkNotNull(kuduTableName_);
+    kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    Preconditions.checkNotNull(kuduMasters_);
+    org.apache.kudu.client.KuduTable kuduTable = null;
+    numRows_ = getRowCount(msTable_.getParameters());
+
+    // Connect to Kudu to retrieve table metadata
+    try (KuduClient kuduClient = new KuduClient.KuduClientBuilder(
+        getKuduMasterHosts()).build()) {
+      kuduTable = kuduClient.openTable(kuduTableName_);
+    } catch (KuduException e) {
+      LOG.error("Error accessing Kudu table " + kuduTableName_);
+      throw new TableLoadingException(e.getMessage());
+    }
+    Preconditions.checkNotNull(kuduTable);
+
+    // Load metadata from Kudu and HMS
+    try {
+      loadSchema(kuduTable);
+      loadDistributeByParams(kuduTable);
+      loadAllColumnStats(msClient);
+    } catch (ImpalaRuntimeException e) {
+      LOG.error("Error loading metadata for Kudu table: " + kuduTableName_);
+      throw new TableLoadingException("Error loading metadata for Kudu table " +
+          kuduTableName_, e);
+    }
 
-    if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) {
-      throw new TableLoadingException(String.format("Kudu tables must have at least one"
-          + "key column (had %d), and no more key columns than there are table columns "
-          + "(had %d).", keyColumns.size(), schema.size()));
+    // Update the table schema in HMS.
+    try {
+      long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
+      msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+      msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
+          StatsSetupConst.TRUE);
+      msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
+    } catch (TException e) {
+      throw new TableLoadingException(e.getMessage());
     }
+  }
 
+  /**
+   * Loads the schema from the Kudu table including column definitions and primary key
+   * columns. Replaces the columns in the HMS table with the columns from the Kudu table.
+   * Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to
+   * Impala data types.
+   */
+  private void loadSchema(org.apache.kudu.client.KuduTable kuduTable)
+      throws ImpalaRuntimeException {
+    Preconditions.checkNotNull(kuduTable);
     clearColumns();
-    Set<String> columnNames = Sets.newHashSet();
+    primaryKeyColumnNames_.clear();
+    List<FieldSchema> cols = msTable_.getSd().getCols();
+    cols.clear();
     int pos = 0;
-    for (FieldSchema field: schema) {
-      org.apache.impala.catalog.Type type = parseColumnType(field);
-      // TODO(kudu-merge): Check for decimal types?
-      boolean isKey = keyColumns.contains(field.getName());
-      KuduColumn col = new KuduColumn(field.getName(), isKey, !isKey, type,
-          field.getComment(), pos);
-      columnNames.add(col.getName());
-      addColumn(col);
+    for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) {
+      Type type = KuduUtil.toImpalaType(colSchema.getType());
+      String colName = colSchema.getName();
+      cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null));
+      boolean isKey = colSchema.isKey();
+      if (isKey) primaryKeyColumnNames_.add(colName);
+      addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos));
       ++pos;
     }
+  }
 
-    if (!columnNames.containsAll(keyColumns)) {
-      throw new TableLoadingException(String.format("Some key columns were not found in"
-              + " the set of columns. List of column names: %s, List of key column names:"
-              + " %s", Iterables.toString(columnNames), Iterables.toString(keyColumns)));
+  private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) {
+    Preconditions.checkNotNull(kuduTable);
+    PartitionSchema partitionSchema = kuduTable.getPartitionSchema();
+    Preconditions.checkState(!colsByPos_.isEmpty());
+    distributeBy_.clear();
+    for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) {
+      List<String> columnNames = Lists.newArrayList();
+      for (int colPos: hashBucketSchema.getColumnIds()) {
+        columnNames.add(colsByPos_.get(colPos).getName());
+      }
+      distributeBy_.add(
+          DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets()));
     }
-
-    kuduKeyColumnNames_ = ImmutableList.copyOf(keyColumns);
-
-    loadAllColumnStats(client);
+    RangeSchema rangeSchema = partitionSchema.getRangeSchema();
+    List<Integer> columnIds = rangeSchema.getColumns();
+    if (columnIds.isEmpty()) return;
+    List<String> columnNames = Lists.newArrayList();
+    for (int colPos: columnIds) columnNames.add(colsByPos_.get(colPos).getName());
+    // We don't populate the split values because Kudu's API doesn't currently support
+    // retrieving the split values for range partitions.
+    // TODO: File a Kudu JIRA.
+    distributeBy_.add(DistributeParam.createRangeParam(columnNames, null));
   }
 
-  @Override
-  public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
-    // TODO handle 'reuseMetadata'
-    if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) {
-      throw new TableLoadingException(String.format(
-          "Cannot load Kudu table %s, table is corrupt.", name_));
+  /**
+   * Creates a temporary KuduTable object populated with the specified properties but has
+   * an invalid TableId and is not added to the Kudu storage engine or the
+   * HMS. This is used for CTAS statements.
+   */
+  public static KuduTable createCtasTarget(Db db,
+      org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
+      List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) {
+    KuduTable tmpTable = new KuduTable(TableId.createInvalidId(), msTbl, db,
+        msTbl.getTableName(), msTbl.getOwner());
+    int pos = 0;
+    for (ColumnDef colDef: columnDefs) {
+      tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
     }
-
-    msTable_ = msTbl;
-    kuduTableName_ = msTbl.getParameters().get(KEY_TABLE_NAME);
-    kuduMasters_ = msTbl.getParameters().get(KEY_MASTER_ADDRESSES);
-
-    String keyColumnsProp = Preconditions.checkNotNull(msTbl.getParameters()
-        .get(KEY_KEY_COLUMNS).toLowerCase(), "'kudu.key_columns' cannot be null.");
-    Set<String> keyColumns = KuduUtil.parseKeyColumns(keyColumnsProp);
-
-    // Load the rest of the data from the table parameters directly
-    loadColumns(msTbl.getSd().getCols(), client, keyColumns);
-
-    numClusteringCols_ = 0;
-
-    // Get row count from stats
-    numRows_ = getRowCount(getMetaStoreTable().getParameters());
+    tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames);
+    tmpTable.distributeBy_.addAll(distributeParams);
+    return tmpTable;
   }
 
   @Override
   public TTable toThrift() {
     TTable table = super.toThrift();
     table.setTable_type(TTableType.KUDU_TABLE);
-    table.setKudu_table(getKuduTable());
+    table.setKudu_table(getTKuduTable());
     return table;
   }
 
@@ -207,33 +286,46 @@ public class KuduTable extends Table {
     TKuduTable tkudu = thriftTable.getKudu_table();
     kuduTableName_ = tkudu.getTable_name();
     kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
-    kuduKeyColumnNames_ = ImmutableList.copyOf(tkudu.getKey_columns());
+    primaryKeyColumnNames_.clear();
+    primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
+    loadDistributeByParamsFromThrift(tkudu.getDistribute_by());
   }
 
-  public String getKuduTableName() { return kuduTableName_; }
-  public String getKuduMasterAddresses() { return kuduMasters_; }
-  public int getNumKeyColumns() { return kuduKeyColumnNames_.size(); }
-
-  /**
-   * Returns true if all required parameters are present in the given table properties
-   * map.
-   * TODO(kudu-merge) Return a more specific error string.
-   */
-  public static boolean tableParamsAreValid(Map<String, String> params) {
-    return params.get(KEY_TABLE_NAME) != null && params.get(KEY_TABLE_NAME).length() > 0
-        && params.get(KEY_MASTER_ADDRESSES) != null
-        && params.get(KEY_MASTER_ADDRESSES).length() > 0
-        && params.get(KEY_KEY_COLUMNS) != null
-        && params.get(KEY_KEY_COLUMNS).length() > 0;
-   }
+  private void loadDistributeByParamsFromThrift(List<TDistributeParam> params) {
+    distributeBy_.clear();
+    for (TDistributeParam param: params) {
+      if (param.isSetBy_hash_param()) {
+        TDistributeByHashParam hashParam = param.getBy_hash_param();
+        distributeBy_.add(DistributeParam.createHashParam(
+            hashParam.getColumns(), hashParam.getNum_buckets()));
+      } else {
+        Preconditions.checkState(param.isSetBy_range_param());
+        TDistributeByRangeParam rangeParam = param.getBy_range_param();
+        distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(),
+            null));
+      }
+    }
+  }
 
-  /**
-   * The number of nodes is not know ahead of time and will be updated during computeStats
-   * in the scan node.
-   */
-  public int getNumNodes() { return -1; }
+  @Override
+  public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
+    TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE,
+        getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
+    desc.setKuduTable(getTKuduTable());
+    return desc;
+  }
 
-  public List<String> getKuduKeyColumnNames() { return kuduKeyColumnNames_; }
+  private TKuduTable getTKuduTable() {
+    TKuduTable tbl = new TKuduTable();
+    tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
+    tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
+    tbl.setTable_name(kuduTableName_);
+    Preconditions.checkNotNull(distributeBy_);
+    for (DistributeParam distributeParam: distributeBy_) {
+      tbl.addToDistribute_by(distributeParam.toThrift());
+    }
+    return tbl;
+  }
 
   public TResultSet getTableStats() throws ImpalaRuntimeException {
     TResultSet result = new TResultSet();
@@ -247,7 +339,7 @@ public class KuduTable extends Table {
     resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift()));
 
     try (KuduClient client = new KuduClient.KuduClientBuilder(
-        getKuduMasterAddresses()).build()) {
+        getKuduMasterHosts()).build()) {
       org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_);
       List<LocatedTablet> tablets =
           kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 6145cc5..4b40b44 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -455,6 +455,11 @@ public abstract class Table implements CatalogObject {
   @Override
   public boolean isLoaded() { return true; }
 
+  public static boolean isExternalTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+  }
+
   /**
    * If the table is cached, it returns a <cache pool name, replication factor> pair
    * and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 764abe0..8541a3a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -18,7 +18,6 @@
 package org.apache.impala.catalog;
 
 import java.util.EnumSet;
-import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Type.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java b/fe/src/main/java/org/apache/impala/catalog/Type.java
index 91fc2e3..05c71c7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Type.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Type.java
@@ -317,6 +317,15 @@ public abstract class Type {
   }
 
   /**
+   * Checks if types t1 and t2 are assignment compatible, i.e. if both t1 and t2 can be
+   * assigned to a type t without an explicit cast and without any conversions that would
+   * result in loss of precision.
+   */
+  public static boolean areAssignmentCompatibleTypes(Type t1, Type t2) {
+    return getAssignmentCompatibleType(t1, t2, true) != ScalarType.INVALID;
+  }
+
+  /**
    * Returns true if this type exceeds the MAX_NESTING_DEPTH, false otherwise.
    */
   public boolean exceedsMaxNestingDepth() { return exceedsMaxNestingDepth(0); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
deleted file mode 100644
index 6c3ba8e..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import java.util.List;
-
-import org.apache.impala.thrift.TDistributeParam;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.thrift.TAlterTableParams;
-
-/**
- * Abstract class to implement the storage specific portion of DDL requests.
- *
- * During catalog DDL operations the CatalogOpExecutor will instantiate the correct
- * subclass of this class to handle the DDL operation to the storage backend. See,
- * CatalogOpExecutor::createDDLDelegate() for details.
- *
- */
-public abstract class DdlDelegate {
-
-  protected Table msTbl_;
-  protected TAlterTableParams tAlterTableParams_;
-  protected List<TDistributeParam> distributeParams_;
-
-  /**
-   * Creates a new delegate to modify Table 'msTbl'.
-   */
-  public DdlDelegate setMsTbl(Table msTbl) {
-    msTbl_ = msTbl;
-    return this;
-  }
-
-  public DdlDelegate setAlterTableParams(TAlterTableParams p) {
-    tAlterTableParams_ = p;
-    return this;
-  }
-
-  public DdlDelegate setDistributeParams(List<TDistributeParam> p) {
-    distributeParams_ = p;
-    return this;
-  }
-
-  /**
-   * Creates the table.
-   */
-  public abstract void createTable() throws ImpalaRuntimeException;
-
-  /**
-   * Drops the table.
-   */
-  public abstract void dropTable() throws ImpalaRuntimeException;
-
-  /**
-   * Performs an alter table with the parameters set with setAlterTableParams().
-   */
-  public abstract boolean alterTable() throws ImpalaRuntimeException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
deleted file mode 100644
index 8410868..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java
+++ /dev/null
@@ -1,190 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import static org.apache.impala.util.KuduUtil.compareSchema;
-import static org.apache.impala.util.KuduUtil.fromImpalaType;
-import static org.apache.impala.util.KuduUtil.parseKeyColumns;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.PartialRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.impala.catalog.KuduTable;
-import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.thrift.TDistributeParam;
-import org.apache.impala.util.KuduUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-
-/**
- * Implementation of the Kudu DDL Delegate. Propagates create and drop table statements to
- * Kudu.
- */
-public class KuduDdlDelegate extends DdlDelegate {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KuduDdlDelegate.class);
-
-  public KuduDdlDelegate(Table msTbl) {
-    setMsTbl(msTbl);
-  }
-
-  /**
-   * Creates the Kudu table if it does not exist and returns true. If the table exists and
-   * the table is not a managed table ignore and return false, otherwise throw an
-   * exception.
-   */
-  @Override
-  public void createTable()
-      throws ImpalaRuntimeException {
-
-    String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME);
-    String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES);
-
-    // Can be optional for un-managed tables
-    String kuduKeyCols = msTbl_.getParameters().get(KuduTable.KEY_KEY_COLUMNS);
-
-    String replication = msTbl_.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
-
-    try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) {
-      // TODO should we throw if the table does not exist when its an external table?
-      if (client.tableExists(kuduTableName)) {
-        if (msTbl_.getTableType().equals(TableType.MANAGED_TABLE.toString())) {
-          throw new ImpalaRuntimeException(String.format(
-              "Table %s already exists in Kudu master %s.", kuduTableName, kuduMasters));
-        }
-
-        // Check if the external table matches the schema
-        org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName);
-        if (!compareSchema(msTbl_, kuduTable)) {
-          throw new ImpalaRuntimeException(String.format(
-              "Table %s (%s) has a different schema in Kudu than in Hive.",
-              msTbl_.getTableName(), kuduTableName));
-        }
-        return;
-      }
-
-      HashSet<String> keyColNames = parseKeyColumns(kuduKeyCols);
-      List<ColumnSchema> keyColSchemas = new ArrayList<>();
-
-      // Create a new Schema and map the types accordingly
-      ArrayList<ColumnSchema> columns = Lists.newArrayList();
-      for (FieldSchema fieldSchema: msTbl_.getSd().getCols()) {
-        org.apache.impala.catalog.Type catalogType = org.apache.impala.catalog.Type
-            .parseColumnType(fieldSchema.getType());
-        if (catalogType == null) {
-          throw new ImpalaRuntimeException(String.format(
-              "Could not parse column type %s.", fieldSchema.getType()));
-        }
-        Type t = fromImpalaType(catalogType);
-        // Create the actual column and check if the column is a key column
-        ColumnSchemaBuilder csb = new ColumnSchemaBuilder(
-            fieldSchema.getName(), t);
-        boolean isKeyColumn = keyColNames.contains(fieldSchema.getName());
-        csb.key(isKeyColumn);
-        csb.nullable(!isKeyColumn);
-        ColumnSchema cs = csb.build();
-        columns.add(cs);
-        if (isKeyColumn) keyColSchemas.add(cs);
-      }
-
-      Schema schema = new Schema(columns);
-      CreateTableOptions cto = new CreateTableOptions();
-
-      // Handle auto-partitioning of the Kudu table
-      if (distributeParams_ != null) {
-        for (TDistributeParam param : distributeParams_) {
-          if (param.isSetBy_hash_param()) {
-            Preconditions.checkState(!param.isSetBy_range_param());
-            cto.addHashPartitions(param.getBy_hash_param().getColumns(),
-                param.getBy_hash_param().getNum_buckets());
-          } else {
-            Preconditions.checkState(param.isSetBy_range_param());
-            cto.setRangePartitionColumns(param.getBy_range_param().getColumns());
-            for (PartialRow p : KuduUtil.parseSplits(schema, param.getBy_range_param())) {
-              cto.addSplitRow(p);
-            }
-          }
-        }
-      }
-
-      if (!Strings.isNullOrEmpty(replication)) {
-        int r = Integer.parseInt(replication);
-        if (r <= 0) {
-          throw new ImpalaRuntimeException(
-              "Number of tablet replicas must be greater than zero. " +
-              "Given number of replicas is: " + Integer.toString(r));
-        }
-        cto.setNumReplicas(r);
-      }
-
-      client.createTable(kuduTableName, schema, cto);
-    } catch (ImpalaRuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new ImpalaRuntimeException("Error creating Kudu table", e);
-    }
-  }
-
-  @Override
-  public void dropTable() throws ImpalaRuntimeException {
-    // If table is an external table, do not delete the data
-    if (msTbl_.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) return;
-
-    String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME);
-    String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES);
-
-    try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) {
-      if (!client.tableExists(kuduTableName)) {
-        LOG.warn("Table: %s is in inconsistent state. It does not exist in Kudu master(s)"
-            + " %s, but it exists in Hive metastore. Deleting from metastore only.",
-            kuduTableName, kuduMasters);
-        return;
-      }
-      client.deleteTable(kuduTableName);
-      return;
-    } catch (Exception e) {
-      throw new ImpalaRuntimeException("Error dropping Kudu table", e);
-    }
-  }
-
-  public static boolean canHandle(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    return KuduTable.isKuduTable(msTbl);
-  }
-
-  @Override
-  public boolean alterTable() throws ImpalaRuntimeException {
-    throw new ImpalaRuntimeException(
-        "Alter table operations are not supported for Kudu tables.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
deleted file mode 100644
index 8aabaa4..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.delegates;
-
-import org.apache.impala.common.ImpalaRuntimeException;
-
-/**
- * Empty implementation for the DdlDelegate interface that does nothing.
- */
-public class UnsupportedOpDelegate extends DdlDelegate {
-
-  @Override
-  public void createTable() throws ImpalaRuntimeException { }
-
-  @Override
-  public void dropTable() throws ImpalaRuntimeException { }
-
-  @Override
-  public boolean alterTable() throws ImpalaRuntimeException { return true; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
index 3345c1b..8d15425 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java
@@ -52,7 +52,7 @@ public class HdfsPartitionFilter {
 
   // lhs exprs of smap used in isMatch()
   private final ArrayList<SlotRef> lhsSlotRefs_ = Lists.newArrayList();
-  // indices into Table.getColumns()
+  // indices into Table.getColumnNames()
   private final ArrayList<Integer> refdKeys_ = Lists.newArrayList();
 
   public HdfsPartitionFilter(Expr predicate, HdfsTable tbl, Analyzer analyzer) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 64ef822..9434801 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -107,7 +107,7 @@ public class KuduScanNode extends ScanNode {
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
     try (KuduClient client =
-         new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) {
+         new KuduClientBuilder(kuduTable_.getKuduMasterHosts()).build()) {
       org.apache.kudu.client.KuduTable rpcTable =
           client.openTable(kuduTable_.getKuduTableName());
       validateSchema(rpcTable);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5743a59..54493d1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -51,6 +51,11 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.TableName;
@@ -61,7 +66,6 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
 import org.apache.impala.catalog.DataSource;
-import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HBaseTable;
@@ -70,6 +74,7 @@ import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HiveStorageDescriptorFactory;
 import org.apache.impala.catalog.IncompleteTable;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PartitionStatsUtil;
@@ -82,9 +87,6 @@ import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
-import org.apache.impala.catalog.delegates.DdlDelegate;
-import org.apache.impala.catalog.delegates.KuduDdlDelegate;
-import org.apache.impala.catalog.delegates.UnsupportedOpDelegate;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -121,7 +123,6 @@ import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TDdlExecResponse;
-import org.apache.impala.thrift.TDistributeParam;
 import org.apache.impala.thrift.TDropDataSourceParams;
 import org.apache.impala.thrift.TDropDbParams;
 import org.apache.impala.thrift.TDropFunctionParams;
@@ -149,11 +150,6 @@ import org.apache.impala.thrift.TTruncateParams;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.util.HdfsCachingUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Class used to execute Catalog Operations, including DDL and refresh/invalidate
@@ -1103,8 +1099,7 @@ public class CatalogOpExecutor {
 
   /**
    * Drops a database from the metastore and removes the database's metadata from the
-   * internal cache. Re-throws any Hive Meta Store exceptions encountered during
-   * the drop.
+   * internal cache. Re-throws any HMS exceptions encountered during the drop.
    */
   private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
       throws ImpalaException {
@@ -1120,6 +1115,9 @@ public class CatalogOpExecutor {
 
     TCatalogObject removedObject = new TCatalogObject();
     synchronized (metastoreDdlLock_) {
+      // Remove all the Kudu tables of 'db' from the Kudu storage engine.
+      if (db != null && params.cascade) dropTablesFromKudu(db);
+
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         msClient.getHiveClient().dropDatabase(
             params.getDb(), true, params.if_exists, params.cascade);
@@ -1144,6 +1142,44 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Drops all the Kudu tables of database 'db' from the Kudu storage engine. Retrieves
+   * the Kudu table name of each table in 'db' from HMS. Throws an ImpalaException if
+   * metadata for Kudu tables cannot be loaded from HMS or if an error occurs while
+   * trying to drop a table from Kudu.
+   */
+  private void dropTablesFromKudu(Db db) throws ImpalaException {
+    // If the table format isn't available, because the table hasn't been loaded yet,
+    // the metadata must be fetched from the Hive Metastore.
+    List<String> incompleteTableNames = Lists.newArrayList();
+    List<org.apache.hadoop.hive.metastore.api.Table> msTables = Lists.newArrayList();
+    for (Table table: db.getTables()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
+      if (msTable == null) {
+        incompleteTableNames.add(table.getName());
+      } else {
+        msTables.add(msTable);
+      }
+    }
+    if (!incompleteTableNames.isEmpty()) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        msTables.addAll(msClient.getHiveClient().getTableObjectsByName(
+            db.getName(), incompleteTableNames));
+      } catch (TException e) {
+        LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTableObjectsByName") +
+            e.getMessage());
+      }
+    }
+    for (org.apache.hadoop.hive.metastore.api.Table msTable: msTables) {
+      if (!KuduTable.isKuduTable(msTable) || Table.isExternalTable(msTable)) continue;
+      // The operation will be aborted if the Kudu table cannot be dropped. If for
+      // some reason Kudu is permanently stuck in a non-functional state, the user is
+      // expected to ALTER TABLE to either set the table to UNMANAGED or set the format
+      // to something else.
+      KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true);
+    }
+  }
+
+  /**
    * Drops a table or view from the metastore and removes it from the catalog.
    * Also drops all associated caching requests on the table and/or table's partitions,
    * uncaching all table data. If params.purge is true, table data is permanently
@@ -1157,17 +1193,6 @@ public class CatalogOpExecutor {
 
     TCatalogObject removedObject = new TCatalogObject();
     synchronized (metastoreDdlLock_) {
-
-      // Forward the DDL operation to the specified storage backend.
-      try {
-        org.apache.hadoop.hive.metastore.api.Table msTbl = getExistingTable(
-            tableName.getDb(), tableName.getTbl()).getMetaStoreTable();
-        DdlDelegate handler = createDdlDelegate(msTbl);
-        handler.dropTable();
-      } catch (TableNotFoundException | DatabaseNotFoundException e) {
-        // Do nothing
-      }
-
       Db db = catalog_.getDb(params.getTable_name().db_name);
       if (db == null) {
         if (params.if_exists) return;
@@ -1179,6 +1204,23 @@ public class CatalogOpExecutor {
         if (params.if_exists) return;
         throw new CatalogException("Table/View does not exist: " + tableName);
       }
+
+      // Retrieve the HMS table to determine if this is a Kudu table.
+      org.apache.hadoop.hive.metastore.api.Table msTbl = existingTbl.getMetaStoreTable();
+      if (msTbl == null) {
+        Preconditions.checkState(existingTbl instanceof IncompleteTable);
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          msTbl = msClient.getHiveClient().getTable(tableName.getDb(),
+              tableName.getTbl());
+        } catch (TException e) {
+          LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
+        }
+      }
+      if (msTbl != null && KuduTable.isKuduTable(msTbl)
+          && !Table.isExternalTable(msTbl)) {
+        KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
+      }
+
       // Check to make sure we don't drop a view with "drop table" statement and
       // vice versa. is_table field is marked optional in TDropTableOrViewParams to
       // maintain catalog api compatibility.
@@ -1343,7 +1385,8 @@ public class CatalogOpExecutor {
 
   /**
    * Creates a new table in the metastore and adds an entry to the metadata cache to
-   * lazily load the new metadata on the next access. Re-throws any Hive Meta Store
+   * lazily load the new metadata on the next access. If this is a managed Kudu table,
+   * the table is also created in the Kudu storage engine. Re-throws any HMS or Kudu
    * exceptions encountered during the create.
    */
   private boolean createTable(TCreateTableParams params, TDdlExecResponse response)
@@ -1351,9 +1394,8 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    Preconditions.checkState(params.getColumns() != null &&
-        params.getColumns().size() > 0,
-        "Null or empty column list given as argument to Catalog.createTable");
+    Preconditions.checkState(params.getColumns() != null,
+        "Null column list given as argument to Catalog.createTable");
 
     if (params.if_not_exists &&
         catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
@@ -1362,11 +1404,161 @@ public class CatalogOpExecutor {
       response.getResult().setVersion(catalog_.getCatalogVersion());
       return false;
     }
-    org.apache.hadoop.hive.metastore.api.Table tbl =
-        createMetaStoreTable(params);
+    org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.debug(String.format("Creating table %s", tableName));
-    return createTable(tbl, params.if_not_exists, params.getCache_op(),
-        params.getDistribute_by(), response);
+    if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response);
+    Preconditions.checkState(params.getColumns().size() > 0,
+        "Empty column list given as argument to Catalog.createTable");
+    return createTable(tbl, params.if_not_exists, params.getCache_op(), response);
+  }
+
+  /**
+   * Utility function that creates a hive.metastore.api.Table object based on the given
+   * TCreateTableParams.
+   * TODO: Extract metastore object creation utility functions into a separate
+   * helper/factory class.
+   */
+  public static org.apache.hadoop.hive.metastore.api.Table createMetaStoreTable(
+      TCreateTableParams params) {
+    Preconditions.checkNotNull(params);
+    TableName tableName = TableName.fromThrift(params.getTable_name());
+    org.apache.hadoop.hive.metastore.api.Table tbl =
+        new org.apache.hadoop.hive.metastore.api.Table();
+    tbl.setDbName(tableName.getDb());
+    tbl.setTableName(tableName.getTbl());
+    tbl.setOwner(params.getOwner());
+    if (params.isSetTable_properties()) {
+      tbl.setParameters(params.getTable_properties());
+    } else {
+      tbl.setParameters(new HashMap<String, String>());
+    }
+
+    if (params.getComment() != null) {
+      tbl.getParameters().put("comment", params.getComment());
+    }
+    if (params.is_external) {
+      tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+      tbl.putToParameters("EXTERNAL", "TRUE");
+    } else {
+      tbl.setTableType(TableType.MANAGED_TABLE.toString());
+    }
+
+    tbl.setSd(createSd(params));
+    if (params.getPartition_columns() != null) {
+      // Add in any partition keys that were specified
+      tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns()));
+    } else {
+      tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+    }
+    return tbl;
+  }
+
+  private static StorageDescriptor createSd(TCreateTableParams params) {
+    StorageDescriptor sd = HiveStorageDescriptorFactory.createSd(
+        params.getFile_format(), RowFormat.fromThrift(params.getRow_format()));
+    if (params.isSetSerde_properties()) {
+      if (sd.getSerdeInfo().getParameters() == null) {
+        sd.getSerdeInfo().setParameters(params.getSerde_properties());
+      } else {
+        sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties());
+      }
+    }
+
+    if (params.getLocation() != null) sd.setLocation(params.getLocation());
+
+    // Add in all the columns
+    sd.setCols(buildFieldSchemaList(params.getColumns()));
+    return sd;
+  }
+
+  /**
+   * Creates a new Kudu table. The Kudu table is first created in the Kudu storage engine
+   * (only applicable to managed tables), then in HMS and finally in the catalog cache.
+   * Failure to add the table in HMS results in the table being dropped from Kudu.
+   * 'response' is populated with the results of this operation. Returns true if a new
+   * table was created as part of this call, false otherwise.
+   */
+  private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
+      TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
+    Preconditions.checkState(KuduTable.isKuduTable(newTable));
+    if (Table.isExternalTable(newTable)) {
+      KuduCatalogOpExecutor.populateColumnsFromKudu(newTable);
+    } else {
+      KuduCatalogOpExecutor.createManagedTable(newTable, params);
+    }
+    try {
+      // Add the table to the HMS and the catalog cache. Aquire metastoreDdlLock_ to
+      // ensure the atomicity of these operations.
+      synchronized (metastoreDdlLock_) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          msClient.getHiveClient().createTable(newTable);
+        }
+        // Add the table to the catalog cache
+        Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
+        addTableToCatalogUpdate(newTbl, response.result);
+      }
+    } catch (Exception e) {
+      try {
+        // Error creating the table in HMS, drop the managed table from Kudu.
+        if (!Table.isExternalTable(newTable)) {
+          KuduCatalogOpExecutor.dropTable(newTable, false);
+        }
+      } catch (Exception logged) {
+        String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
+        LOG.error(String.format("Failed to drop Kudu table '%s'", kuduTableName),
+            logged);
+        throw new RuntimeException(String.format("Failed to create the table '%s' in " +
+            " the Metastore and the newly created Kudu table '%s' could not be " +
+            " dropped. The log contains more information.", newTable.getTableName(),
+            kuduTableName), e);
+      }
+      if (e instanceof AlreadyExistsException && params.if_not_exists) return false;
+      throw new ImpalaRuntimeException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
+    }
+    return true;
+  }
+
+  /**
+   * Creates a new table. The table is initially created in HMS and, if that operation
+   * succeeds, it is then added in the catalog cache. It also sets HDFS caching if
+   * 'cacheOp' is not null. 'response' is populated with the results of this operation.
+   * Returns true if a new table was created as part of this call, false otherwise.
+   */
+  private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
+      boolean if_not_exists, THdfsCachingOp cacheOp, TDdlExecResponse response)
+      throws ImpalaException {
+    Preconditions.checkState(!KuduTable.isKuduTable(newTable));
+    synchronized (metastoreDdlLock_) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        msClient.getHiveClient().createTable(newTable);
+        // If this table should be cached, and the table location was not specified by
+        // the user, an extra step is needed to read the table to find the location.
+        if (cacheOp != null && cacheOp.isSet_cached() &&
+            newTable.getSd().getLocation() == null) {
+          newTable = msClient.getHiveClient().getTable(
+              newTable.getDbName(), newTable.getTableName());
+        }
+      } catch (Exception e) {
+        if (e instanceof AlreadyExistsException && if_not_exists) return false;
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
+      }
+
+      // Submit the cache request and update the table metadata.
+      if (cacheOp != null && cacheOp.isSet_cached()) {
+        short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
+            JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
+        long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
+            cacheOp.getCache_pool_name(), replication);
+        catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
+            new TTableName(newTable.getDbName(), newTable.getTableName()));
+        applyAlterTable(newTable);
+      }
+      Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
+      addTableToCatalogUpdate(newTbl, response.result);
+    }
+    return true;
   }
 
   /**
@@ -1392,7 +1584,7 @@ public class CatalogOpExecutor {
         new org.apache.hadoop.hive.metastore.api.Table();
     setViewAttributes(params, view);
     LOG.debug(String.format("Creating view %s", tableName));
-    createTable(view, params.if_not_exists, null, null, response);
+    createTable(view, params.if_not_exists, null, response);
   }
 
   /**
@@ -1423,6 +1615,8 @@ public class CatalogOpExecutor {
     Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
     org.apache.hadoop.hive.metastore.api.Table tbl =
         srcTable.getMetaStoreTable().deepCopy();
+    Preconditions.checkState(!KuduTable.isKuduTable(tbl),
+        "CREATE TABLE LIKE is not supported for Kudu tables.");
     tbl.setDbName(tblName.getDb());
     tbl.setTableName(tblName.getTbl());
     tbl.setOwner(params.getOwner());
@@ -1460,7 +1654,7 @@ public class CatalogOpExecutor {
     tbl.getSd().setLocation(params.getLocation());
     if (fileFormat != null) {
       setStorageDescriptorFileFormat(tbl.getSd(), fileFormat);
-    } else if (fileFormat == null && srcTable instanceof View) {
+    } else if (srcTable instanceof View) {
       // Here, source table is a view which has no input format. So to be
       // consistent with CREATE TABLE, default input format is assumed to be
       // TEXT unless otherwise specified.
@@ -1469,85 +1663,7 @@ public class CatalogOpExecutor {
     // Set the row count of this table to unknown.
     tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
     LOG.debug(String.format("Creating table %s LIKE %s", tblName, srcTblName));
-    createTable(tbl, params.if_not_exists, null, null, response);
-  }
-
-  /**
-   * Creates a new table in the HMS. If ifNotExists=true, no error will be thrown if
-   * the table already exists, otherwise an exception will be thrown.
-   * Accepts an optional 'cacheOp' param, which if specified will cache the table's
-   * HDFS location according to the 'cacheOp' spec after creation.
-   * Stores details of the operations (such as the resulting catalog version) in
-   * 'response' output parameter.
-   * Returns true if a new table was created as part of this call, false otherwise.
-   */
-  private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      boolean ifNotExists, THdfsCachingOp cacheOp, List<TDistributeParam> distribute_by,
-      TDdlExecResponse response)
-      throws ImpalaException {
-    synchronized (metastoreDdlLock_) {
-
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        msClient.getHiveClient().createTable(newTable);
-        // If this table should be cached, and the table location was not specified by
-        // the user, an extra step is needed to read the table to find the location.
-        if (cacheOp != null && cacheOp.isSet_cached() &&
-            newTable.getSd().getLocation() == null) {
-          newTable = msClient.getHiveClient().getTable(newTable.getDbName(),
-              newTable.getTableName());
-        }
-      } catch (AlreadyExistsException e) {
-        if (!ifNotExists) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
-        }
-        LOG.debug(String.format("Ignoring '%s' when creating table %s.%s because " +
-            "IF NOT EXISTS was specified.", e,
-            newTable.getDbName(), newTable.getTableName()));
-        return false;
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
-      }
-
-      // Forward the operation to a specific storage backend. If the operation fails,
-      // delete the just created hive table to avoid inconsistencies.
-      try {
-        createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable();
-      } catch (ImpalaRuntimeException e) {
-        try (MetaStoreClient c = catalog_.getMetaStoreClient()) {
-          c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(),
-              false, ifNotExists);
-        } catch (Exception hE) {
-          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
-              "dropTable"), hE);
-        }
-        throw e;
-      }
-
-      // Submit the cache request and update the table metadata.
-      if (cacheOp != null && cacheOp.isSet_cached()) {
-        short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
-            JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
-        long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
-            cacheOp.getCache_pool_name(), replication);
-        catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
-            new TTableName(newTable.getDbName(), newTable.getTableName()));
-        applyAlterTable(newTable);
-      }
-      Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
-      addTableToCatalogUpdate(newTbl, response.result);
-    }
-    return true;
-  }
-
-  /**
-   * Instantiate the appropriate DDL delegate for the table. If no known delegate is
-   * available for the table, returns a UnsupportedOpDelegate instance.
-   */
-  private DdlDelegate createDdlDelegate(org.apache.hadoop.hive.metastore.api.Table tab) {
-    if (KuduDdlDelegate.canHandle(tab)) return new KuduDdlDelegate(tab);
-    return new UnsupportedOpDelegate();
+    createTable(tbl, params.if_not_exists, null, response);
   }
 
   /**
@@ -1967,6 +2083,9 @@ public class CatalogOpExecutor {
       switch (params.getTarget()) {
         case TBL_PROPERTY:
           msTbl.getParameters().putAll(properties);
+          if (KuduTable.isKuduTable(msTbl)) {
+            KuduCatalogOpExecutor.validateKuduTblExists(msTbl);
+          }
           break;
         case SERDE_PROPERTY:
           msTbl.getSd().getSerdeInfo().getParameters().putAll(properties);
@@ -2120,7 +2239,6 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(cacheOp);
     Preconditions.checkNotNull(params.getPartition_spec());
     // Alter partition params.
-    final String RUNTIME_FILTER_FORMAT = "apply %s on %s";
     TableName tableName = tbl.getTableName();
     HdfsPartition partition = catalog_.getHdfsPartition(
         tableName.getDb(), tableName.getTbl(), params.getPartition_spec());
@@ -2535,16 +2653,6 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Returns a deep copy of the metastore.api.Table object for the given TableName.
-   */
-  private org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable(
-      TableName tableName) throws CatalogException {
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    return getExistingTable(tableName.getDb(), tableName.getTbl())
-        .getMetaStoreTable().deepCopy();
-  }
-
-  /**
    * Returns the metastore.api.Table object from the Hive Metastore for an existing
    * fully loaded table.
    */
@@ -2608,7 +2716,7 @@ public class CatalogOpExecutor {
   /**
    * Calculates the next transient_lastDdlTime value.
    */
-  private static long calculateDdlTime(
+  public static long calculateDdlTime(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
     long existingLastDdlTime = CatalogServiceCatalog.getLastDdlTime(msTbl);
     long currentTime = System.currentTimeMillis() / 1000;
@@ -2617,63 +2725,6 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Utility function that creates a hive.metastore.api.Table object based on the given
-   * TCreateTableParams.
-   * TODO: Extract metastore object creation utility functions into a separate
-   * helper/factory class.
-   */
-  public static org.apache.hadoop.hive.metastore.api.Table
-      createMetaStoreTable(TCreateTableParams params) {
-    Preconditions.checkNotNull(params);
-    TableName tableName = TableName.fromThrift(params.getTable_name());
-    org.apache.hadoop.hive.metastore.api.Table tbl =
-        new org.apache.hadoop.hive.metastore.api.Table();
-    tbl.setDbName(tableName.getDb());
-    tbl.setTableName(tableName.getTbl());
-    tbl.setOwner(params.getOwner());
-    if (params.isSetTable_properties()) {
-      tbl.setParameters(params.getTable_properties());
-    } else {
-      tbl.setParameters(new HashMap<String, String>());
-    }
-
-    if (params.getComment() != null) {
-      tbl.getParameters().put("comment", params.getComment());
-    }
-    if (params.is_external) {
-      tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
-      tbl.putToParameters("EXTERNAL", "TRUE");
-    } else {
-      tbl.setTableType(TableType.MANAGED_TABLE.toString());
-    }
-
-    StorageDescriptor sd = HiveStorageDescriptorFactory.createSd(
-        params.getFile_format(), RowFormat.fromThrift(params.getRow_format()));
-
-    if (params.isSetSerde_properties()) {
-      if (sd.getSerdeInfo().getParameters() == null) {
-        sd.getSerdeInfo().setParameters(params.getSerde_properties());
-      } else {
-        sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties());
-      }
-    }
-
-    if (params.getLocation() != null) {
-      sd.setLocation(params.getLocation());
-    }
-    // Add in all the columns
-    sd.setCols(buildFieldSchemaList(params.getColumns()));
-    tbl.setSd(sd);
-    if (params.getPartition_columns() != null) {
-      // Add in any partition keys that were specified
-      tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns()));
-    } else {
-      tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-    }
-    return tbl;
-  }
-
-  /**
    * Executes a TResetMetadataRequest and returns the result as a
    * TResetMetadataResponse. Based on the request parameters, this operation
    * may do one of three things:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 00a3d93..6d535fd 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -170,9 +170,11 @@ public class Frontend {
   private final AtomicReference<AuthorizationChecker> authzChecker_;
   private final ScheduledExecutorService policyReader_ =
       Executors.newScheduledThreadPool(1);
+  private final String defaultKuduMasterHosts_;
 
-  public Frontend(AuthorizationConfig authorizationConfig) {
-    this(authorizationConfig, new ImpaladCatalog());
+  public Frontend(AuthorizationConfig authorizationConfig,
+      String defaultKuduMasterHosts) {
+    this(authorizationConfig, new ImpaladCatalog(defaultKuduMasterHosts));
   }
 
   /**
@@ -181,6 +183,7 @@ public class Frontend {
   public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) {
     authzConfig_ = authorizationConfig;
     impaladCatalog_ = catalog;
+    defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts();
     authzChecker_ = new AtomicReference<AuthorizationChecker>(
         new AuthorizationChecker(authzConfig_, impaladCatalog_.getAuthPolicy()));
     // If authorization is enabled, reload the policy on a regular basis.
@@ -226,7 +229,7 @@ public class Frontend {
 
     // If this is not a delta, this update should replace the current
     // Catalog contents so create a new catalog and populate it.
-    if (!req.is_delta) catalog = new ImpaladCatalog();
+    if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
 
     TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index fc8deaf..7d0af54 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -102,7 +102,7 @@ public class JniCatalog {
     try {
       catalog_.reset();
     } catch (CatalogException e) {
-      LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e);
+      LOG.error("Error initializing Catalog. Please run 'invalidate metadata'", e);
     }
     catalogOpExecutor_ = new CatalogOpExecutor(catalog_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 07d6ec6..0d502e5 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -117,7 +117,8 @@ public class JniFrontend {
    */
   public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile,
       String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel,
-      int otherLogLevel, boolean allowAuthToLocal) throws InternalException {
+      int otherLogLevel, boolean allowAuthToLocal, String defaultKuduMasterHosts)
+      throws InternalException {
     BackendConfig.setAuthToLocal(allowAuthToLocal);
     GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
         TLogLevel.values()[otherLogLevel]);
@@ -136,7 +137,7 @@ public class JniFrontend {
     }
     LOG.info(JniUtil.getJavaVersion());
 
-    frontend_ = new Frontend(authConfig);
+    frontend_ = new Frontend(authConfig, defaultKuduMasterHosts);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
new file mode 100644
index 0000000..bd6d0fe
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import java.lang.NumberFormatException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.analysis.ToSqlUtils;
+import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.util.KuduUtil;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.PartialRow;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a helper for the CatalogOpExecutor to provide Kudu related DDL functionality
+ * such as creating and dropping tables from Kudu.
+ */
+public class KuduCatalogOpExecutor {
+  public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
+
+  /**
+   * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
+   * Throws an exception if 'msTbl' represents an external table or if the table couldn't
+   * be created in Kudu.
+   */
+  static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TCreateTableParams params) throws ImpalaRuntimeException {
+    Preconditions.checkState(!Table.isExternalTable(msTbl));
+    String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    LOG.debug(String.format("Creating table '%s' in master '%s'", kuduTableName,
+        masterHosts));
+    try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+      // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
+      // (see KUDU-1710).
+      if (kudu.tableExists(kuduTableName)) {
+        if (params.if_not_exists) return;
+        throw new ImpalaRuntimeException(String.format(
+            "Table '%s' already exists in Kudu.", kuduTableName));
+      }
+      Schema schema = createTableSchema(msTbl, params);
+      CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
+      kudu.createTable(kuduTableName, schema, tableOpts);
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(String.format("Error creating table '%s'",
+          kuduTableName), e);
+    }
+  }
+
+  /**
+   * Creates the schema of a new Kudu table.
+   */
+  private static Schema createTableSchema(
+      org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params)
+      throws ImpalaRuntimeException {
+    Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names());
+    List<FieldSchema> fieldSchemas = msTbl.getSd().getCols();
+    List<ColumnSchema> colSchemas = new ArrayList<>(fieldSchemas.size());
+    for (FieldSchema fieldSchema : fieldSchemas) {
+      Type type = Type.parseColumnType(fieldSchema.getType());
+      Preconditions.checkState(type != null);
+      org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
+      // Create the actual column and check if the column is a key column
+      ColumnSchemaBuilder csb =
+          new ColumnSchemaBuilder(fieldSchema.getName(), kuduType);
+      boolean isKeyCol = keyColNames.contains(fieldSchema.getName());
+      csb.key(isKeyCol);
+      csb.nullable(!isKeyCol);
+      colSchemas.add(csb.build());
+    }
+    return new Schema(colSchemas);
+  }
+
+  /**
+   * Builds the table options of a new Kudu table.
+   */
+  private static CreateTableOptions buildTableOptions(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TCreateTableParams params, Schema schema) throws ImpalaRuntimeException {
+    CreateTableOptions tableOpts = new CreateTableOptions();
+    // Set the distribution schemes
+    List<TDistributeParam> distributeParams = params.getDistribute_by();
+    if (distributeParams != null) {
+      boolean hasRangePartitioning = false;
+      for (TDistributeParam distParam : distributeParams) {
+        if (distParam.isSetBy_hash_param()) {
+          Preconditions.checkState(!distParam.isSetBy_range_param());
+          tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(),
+              distParam.getBy_hash_param().getNum_buckets());
+        } else {
+          Preconditions.checkState(distParam.isSetBy_range_param());
+          hasRangePartitioning = true;
+          tableOpts.setRangePartitionColumns(
+              distParam.getBy_range_param().getColumns());
+          for (PartialRow partialRow :
+              KuduUtil.parseSplits(schema, distParam.getBy_range_param())) {
+            tableOpts.addSplitRow(partialRow);
+          }
+        }
+      }
+      // If no range-based distribution is specified in a CREATE TABLE statement, Kudu
+      // generates one by default that includes all the primary key columns. To prevent
+      // this from happening, explicitly set the range partition columns to be
+      // an empty list.
+      if (!hasRangePartitioning) {
+        tableOpts.setRangePartitionColumns(Collections.<String>emptyList());
+      }
+    }
+
+    // Set the number of table replicas, if specified.
+    String replication = msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
+    if (!Strings.isNullOrEmpty(replication)) {
+      try {
+        int r = Integer.parseInt(replication);
+        Preconditions.checkState(r > 0);
+        tableOpts.setNumReplicas(r);
+      } catch (NumberFormatException e) {
+        throw new ImpalaRuntimeException(String.format("Invalid number of table " +
+            "replicas specified: '%s'", replication), e);
+      }
+    }
+    return tableOpts;
+  }
+
+  /**
+   * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a
+   * TableNotFoundException is thrown. If the table exists and could not be dropped,
+   * an ImpalaRuntimeException is thrown.
+   */
+  static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException {
+    Preconditions.checkState(!Table.isExternalTable(msTbl));
+    String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    LOG.debug(String.format("Dropping table '%s' from master '%s'", tableName,
+        masterHosts));
+    try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+      Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
+      // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
+      // (see KUDU-1710).
+      if (kudu.tableExists(tableName)) {
+        kudu.deleteTable(tableName);
+      } else if (!ifExists) {
+        throw new TableNotFoundException(String.format(
+            "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts));
+      }
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(String.format("Error dropping table '%s'",
+          tableName), e);
+    }
+  }
+
+  /**
+   * Reads the column definitions from a Kudu table and populates 'msTbl' with
+   * an equivalent schema. Throws an exception if any errors are encountered.
+   */
+  public static void populateColumnsFromKudu(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
+    org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
+    List<FieldSchema> cols = msTblCopy.getSd().getCols();
+    String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+    String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    LOG.debug(String.format("Loading schema of table '%s' from master '%s'",
+        kuduTableName, masterHosts));
+    try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+      if (!kudu.tableExists(kuduTableName)) {
+        throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
+            "'%s'", kuduTableName));
+      }
+      org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName);
+      // Replace the columns in the Metastore table with the columns from the recently
+      // accessed Kudu schema.
+      cols.clear();
+      for (ColumnSchema colSchema : kuduTable.getSchema().getColumns()) {
+        Type type = KuduUtil.toImpalaType(colSchema.getType());
+        cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(), null));
+      }
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(String.format("Error loading schema of table " +
+          "'%s'", kuduTableName), e);
+    }
+    List<FieldSchema> newCols = msTbl.getSd().getCols();
+    newCols.clear();
+    newCols.addAll(cols);
+  }
+
+  /**
+   * Validates the table properties of a Kudu table. It checks that the specified master
+   * addresses point to valid Kudu masters and that the table exists.
+   * Throws an ImpalaRuntimeException if this is not the case.
+   */
+  public static void validateKuduTblExists(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
+    String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
+    String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
+    Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+    try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
+      kudu.tableExists(kuduTableName);
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not exist " +
+          "on master '%s'", kuduTableName, masterHosts), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index b9f8653..a679032 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -18,14 +18,15 @@
 package org.apache.impala.util;
 
 import java.io.StringReader;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import javax.json.Json;
 import javax.json.JsonArray;
 import javax.json.JsonReader;
 
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TDistributeByRangeParam;
 import org.apache.impala.thrift.TRangeLiteral;
@@ -33,48 +34,17 @@ import org.apache.impala.thrift.TRangeLiteralList;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 
-import static org.apache.impala.catalog.Type.parseColumnType;
 import static java.lang.String.format;
 
 public class KuduUtil {
 
   private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys.";
-
-  /**
-   * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have
-   * a matching schema.
-   */
-  public static boolean compareSchema(Table msTable, KuduTable kuduTable)
-      throws ImpalaRuntimeException {
-    List<FieldSchema> msFields = msTable.getSd().getCols();
-    List<ColumnSchema> kuduFields = kuduTable.getSchema().getColumns();
-    if (msFields.size() != kuduFields.size()) return false;
-
-    HashMap<String, ColumnSchema> kuduFieldMap = Maps.newHashMap();
-    for (ColumnSchema kuduField : kuduFields) {
-      kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField);
-    }
-
-    for (FieldSchema msField : msFields) {
-      ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase());
-      if (kuduField == null
-          || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
+  private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
 
   /**
    * Parses split keys from statements.
@@ -145,10 +115,9 @@ public class KuduUtil {
   /**
    * Sets the value in 'key' at 'pos', given the json representation.
    */
-  private static void setKey(Type type, JsonArray array, int pos, PartialRow key)
-      throws ImpalaRuntimeException {
+  private static void setKey(org.apache.kudu.Type type, JsonArray array, int pos,
+      PartialRow key) throws ImpalaRuntimeException {
     switch (type) {
-      case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break;
       case INT8: key.addByte(pos, (byte) array.getInt(pos)); break;
       case INT16: key.addShort(pos, (short) array.getInt(pos)); break;
       case INT32: key.addInt(pos, array.getInt(pos)); break;
@@ -163,13 +132,9 @@ public class KuduUtil {
   /**
    * Sets the value in 'key' at 'pos', given the range literal.
    */
-  private static void setKey(Type type, TRangeLiteral literal, int pos, String colName,
-      PartialRow key) throws ImpalaRuntimeException {
+  private static void setKey(org.apache.kudu.Type type, TRangeLiteral literal, int pos,
+      String colName, PartialRow key) throws ImpalaRuntimeException {
     switch (type) {
-      case BOOL:
-        checkCorrectType(literal.isSetBool_literal(), type, colName, literal);
-        key.addBoolean(pos, literal.isBool_literal());
-        break;
       case INT8:
         checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
         key.addByte(pos, (byte) literal.getInt_literal());
@@ -200,8 +165,8 @@ public class KuduUtil {
    * If correctType is true, returns. Otherwise throws a formatted error message
    * indicating problems with the type of the literal of the range literal.
    */
-  private static void checkCorrectType(boolean correctType, Type t, String colName,
-      TRangeLiteral literal) throws ImpalaRuntimeException {
+  private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t,
+      String colName, TRangeLiteral literal) throws ImpalaRuntimeException {
     if (correctType) return;
     throw new ImpalaRuntimeException(
         format("Expected %s literal for column '%s' got '%s'", t.getName(), colName,
@@ -220,11 +185,24 @@ public class KuduUtil {
     return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase()));
   }
 
+  public static boolean isSupportedKeyType(org.apache.impala.catalog.Type type) {
+    return type.isIntegerType() || type.isStringType();
+  }
+
+  /**
+   * Return the name that should be used in Kudu when creating a table, assuming a custom
+   * name was not provided.
+   */
+  public static String getDefaultCreateKuduTableName(String metastoreDbName,
+      String metastoreTableName) {
+    return KUDU_TABLE_NAME_PREFIX + metastoreDbName + "." + metastoreTableName;
+  }
+
   /**
    * Converts a given Impala catalog type to the Kudu type. Throws an exception if the
    * type cannot be converted.
    */
-  public static Type fromImpalaType(org.apache.impala.catalog.Type t)
+  public static org.apache.kudu.Type fromImpalaType(Type t)
       throws ImpalaRuntimeException {
     if (!t.isScalarType()) {
       throw new ImpalaRuntimeException(format(
@@ -232,16 +210,16 @@ public class KuduUtil {
     }
     ScalarType s = (ScalarType) t;
     switch (s.getPrimitiveType()) {
-      case TINYINT: return Type.INT8;
-      case SMALLINT: return Type.INT16;
-      case INT: return Type.INT32;
-      case BIGINT: return Type.INT64;
-      case BOOLEAN: return Type.BOOL;
-      case CHAR: return Type.STRING;
-      case STRING: return Type.STRING;
-      case VARCHAR: return Type.STRING;
-      case DOUBLE: return Type.DOUBLE;
-      case FLOAT: return Type.FLOAT;
+      case TINYINT: return org.apache.kudu.Type.INT8;
+      case SMALLINT: return org.apache.kudu.Type.INT16;
+      case INT: return org.apache.kudu.Type.INT32;
+      case BIGINT: return org.apache.kudu.Type.INT64;
+      case BOOLEAN: return org.apache.kudu.Type.BOOL;
+      case CHAR: return org.apache.kudu.Type.STRING;
+      case STRING: return org.apache.kudu.Type.STRING;
+      case VARCHAR: return org.apache.kudu.Type.STRING;
+      case DOUBLE: return org.apache.kudu.Type.DOUBLE;
+      case FLOAT: return org.apache.kudu.Type.FLOAT;
         /* Fall through below */
       case INVALID_TYPE:
       case NULL_TYPE:
@@ -256,11 +234,27 @@ public class KuduUtil {
     }
   }
 
+  public static Type toImpalaType(org.apache.kudu.Type t)
+      throws ImpalaRuntimeException {
+    switch (t) {
+      case BOOL: return Type.BOOLEAN;
+      case DOUBLE: return Type.DOUBLE;
+      case FLOAT: return Type.FLOAT;
+      case INT8: return Type.TINYINT;
+      case INT16: return Type.SMALLINT;
+      case INT32: return Type.INT;
+      case INT64: return Type.BIGINT;
+      case STRING: return Type.STRING;
+      default:
+        throw new ImpalaRuntimeException(String.format(
+            "Kudu type %s is not supported in Impala", t));
+    }
+  }
+
   /**
    * Returns the string value of the RANGE literal.
    */
   static String toString(TRangeLiteral l) throws ImpalaRuntimeException {
-    if (l.isSetBool_literal()) return String.valueOf(l.bool_literal);
     if (l.isSetString_literal()) return String.valueOf(l.string_literal);
     if (l.isSetInt_literal()) return String.valueOf(l.int_literal);
     throw new ImpalaRuntimeException("Unsupported type for RANGE literal.");