You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:32 UTC

[15/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
deleted file mode 100644
index 4814503..0000000
--- a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
+++ /dev/null
@@ -1,2990 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.service;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.PartitionDropOptions;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-
-import com.cloudera.impala.analysis.FunctionName;
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.authorization.User;
-import com.cloudera.impala.catalog.Catalog;
-import com.cloudera.impala.catalog.CatalogException;
-import com.cloudera.impala.catalog.CatalogServiceCatalog;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.ColumnNotFoundException;
-import com.cloudera.impala.catalog.DataSource;
-import com.cloudera.impala.catalog.DatabaseNotFoundException;
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.catalog.Function;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.HdfsFileFormat;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.HiveStorageDescriptorFactory;
-import com.cloudera.impala.catalog.IncompleteTable;
-import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import com.cloudera.impala.catalog.PartitionNotFoundException;
-import com.cloudera.impala.catalog.PartitionStatsUtil;
-import com.cloudera.impala.catalog.Role;
-import com.cloudera.impala.catalog.RolePrivilege;
-import com.cloudera.impala.catalog.RowFormat;
-import com.cloudera.impala.catalog.ScalarFunction;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.catalog.TableLoadingException;
-import com.cloudera.impala.catalog.TableNotFoundException;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.catalog.View;
-import com.cloudera.impala.catalog.delegates.DdlDelegate;
-import com.cloudera.impala.catalog.delegates.KuduDdlDelegate;
-import com.cloudera.impala.catalog.delegates.UnsupportedOpDelegate;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.thrift.ImpalaInternalServiceConstants;
-import com.cloudera.impala.thrift.JniCatalogConstants;
-import com.cloudera.impala.thrift.TAlterTableAddPartitionParams;
-import com.cloudera.impala.thrift.TAlterTableAddReplaceColsParams;
-import com.cloudera.impala.thrift.TAlterTableChangeColParams;
-import com.cloudera.impala.thrift.TAlterTableDropColParams;
-import com.cloudera.impala.thrift.TAlterTableDropPartitionParams;
-import com.cloudera.impala.thrift.TAlterTableParams;
-import com.cloudera.impala.thrift.TAlterTableSetCachedParams;
-import com.cloudera.impala.thrift.TAlterTableSetFileFormatParams;
-import com.cloudera.impala.thrift.TAlterTableSetLocationParams;
-import com.cloudera.impala.thrift.TAlterTableSetTblPropertiesParams;
-import com.cloudera.impala.thrift.TAlterTableType;
-import com.cloudera.impala.thrift.TAlterTableUpdateStatsParams;
-import com.cloudera.impala.thrift.TCatalogObject;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TCatalogUpdateResult;
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.TColumnStats;
-import com.cloudera.impala.thrift.TColumnType;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TCreateDataSourceParams;
-import com.cloudera.impala.thrift.TCreateDbParams;
-import com.cloudera.impala.thrift.TCreateDropRoleParams;
-import com.cloudera.impala.thrift.TCreateFunctionParams;
-import com.cloudera.impala.thrift.TCreateOrAlterViewParams;
-import com.cloudera.impala.thrift.TCreateTableLikeParams;
-import com.cloudera.impala.thrift.TCreateTableParams;
-import com.cloudera.impala.thrift.TDatabase;
-import com.cloudera.impala.thrift.TDdlExecRequest;
-import com.cloudera.impala.thrift.TDdlExecResponse;
-import com.cloudera.impala.thrift.TDistributeParam;
-import com.cloudera.impala.thrift.TDropDataSourceParams;
-import com.cloudera.impala.thrift.TDropDbParams;
-import com.cloudera.impala.thrift.TDropFunctionParams;
-import com.cloudera.impala.thrift.TDropStatsParams;
-import com.cloudera.impala.thrift.TDropTableOrViewParams;
-import com.cloudera.impala.thrift.TErrorCode;
-import com.cloudera.impala.thrift.TFunctionBinaryType;
-import com.cloudera.impala.thrift.TGrantRevokePrivParams;
-import com.cloudera.impala.thrift.TGrantRevokeRoleParams;
-import com.cloudera.impala.thrift.THdfsCachingOp;
-import com.cloudera.impala.thrift.THdfsFileFormat;
-import com.cloudera.impala.thrift.TPartitionKeyValue;
-import com.cloudera.impala.thrift.TPartitionStats;
-import com.cloudera.impala.thrift.TPrivilege;
-import com.cloudera.impala.thrift.TResetMetadataRequest;
-import com.cloudera.impala.thrift.TResetMetadataResponse;
-import com.cloudera.impala.thrift.TResultRow;
-import com.cloudera.impala.thrift.TResultSet;
-import com.cloudera.impala.thrift.TResultSetMetadata;
-import com.cloudera.impala.thrift.TStatus;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.thrift.TTableName;
-import com.cloudera.impala.thrift.TTableStats;
-import com.cloudera.impala.thrift.TTruncateParams;
-import com.cloudera.impala.thrift.TUpdateCatalogRequest;
-import com.cloudera.impala.thrift.TUpdateCatalogResponse;
-import com.cloudera.impala.util.HdfsCachingUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Class used to execute Catalog Operations, including DDL and refresh/invalidate
- * metadata requests. Acts as a bridge between the Thrift catalog operation requests
- * and the non-thrift Java Catalog objects.
- *
- * Updates are applied first to the Hive Metastore and only if they succeed, are then
- * applied to the catalog objects. To ensure consistency in the presence of failed HMS
- * updates, DDL operations should not directly modify the HMS objects of the catalog
- * objects but should operate on copies instead.
- *
- * The CatalogOpExecutor uses table-level locking to protect table metadata during
- * concurrent modifications and is responsible for assigning a new catalog version when
- * a table is modified (e.g. alterTable()).
- *
- * The following locking protocol is employed to ensure that modifying
- * the table metadata and assigning a new catalog version is performed atomically and
- * consistently in the presence of concurrent DDL operations:
- * 1. Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
- * 2. Acquire a table lock
- * 3. Increment and get a new catalog version
- * 4. Release the catalog lock
- * 5. Modify table metadata
- * 6. Release table lock
- *
- * Operations that CREATE/DROP catalog objects such as tables and databases employ the
- * following locking protocol:
- * 1. Acquire the metastoreDdlLock_
- * 2. Update the Hive Metastore
- * 3. Increment and get a new catalog version
- * 4. Update the catalog
- * 5. Release the metastoreDdlLock_
- *
- * It is imperative that other operations that need to hold both the catalog lock and
- * table locks at the same time follow the same locking protocol and acquire these
- * locks in that particular order. Also, operations that modify table metadata
- * (e.g. alter table statements) should not acquire the metastoreDdlLock_.
- *
- * TODO: Refactor the CatalogOpExecutor and CatalogServiceCatalog classes and consolidate
- * the locking protocol into a single class.
- *
- * TODO: Improve catalog's consistency guarantees by using a hierarchical locking scheme.
- * Currently, only concurrent modidications to table metadata are guaranteed to be
- * serialized. Concurrent DDL operations that DROP/ADD catalog objects,
- * especially in the presence of INVALIDATE METADATA and REFRESH, are not guaranteed to
- * be consistent (see IMPALA-2774).
- *
- * TODO: Create a Hive Metastore utility class to move code that interacts with the
- * metastore out of this class.
- */
-public class CatalogOpExecutor {
-  // Format string for exceptions returned by Hive Metastore RPCs.
-  private final static String HMS_RPC_ERROR_FORMAT_STR =
-      "Error making '%s' RPC to Hive Metastore: ";
-
-  private final CatalogServiceCatalog catalog_;
-
-  // Lock used to ensure that CREATE[DROP] TABLE[DATABASE] operations performed in
-  // catalog_ and the corresponding RPC to apply the change in HMS are atomic.
-  private final Object metastoreDdlLock_ = new Object();
-  private static final Logger LOG = Logger.getLogger(CatalogOpExecutor.class);
-
-  // The maximum number of partitions to update in one Hive Metastore RPC.
-  // Used when persisting the results of COMPUTE STATS statements.
-  private final static short MAX_PARTITION_UPDATES_PER_RPC = 500;
-
-  public CatalogOpExecutor(CatalogServiceCatalog catalog) {
-    catalog_ = catalog;
-  }
-
-  public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
-      throws ImpalaException {
-    TDdlExecResponse response = new TDdlExecResponse();
-    response.setResult(new TCatalogUpdateResult());
-    response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
-    User requestingUser = null;
-    if (ddlRequest.isSetHeader()) {
-      requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
-    }
-
-    switch (ddlRequest.ddl_type) {
-      case ALTER_TABLE:
-        alterTable(ddlRequest.getAlter_table_params(), response);
-        break;
-      case ALTER_VIEW:
-        alterView(ddlRequest.getAlter_view_params(), response);
-        break;
-      case CREATE_DATABASE:
-        createDatabase(ddlRequest.getCreate_db_params(), response);
-        break;
-      case CREATE_TABLE_AS_SELECT:
-        response.setNew_table_created(
-            createTable(ddlRequest.getCreate_table_params(), response));
-        break;
-      case CREATE_TABLE:
-        createTable(ddlRequest.getCreate_table_params(), response);
-        break;
-      case CREATE_TABLE_LIKE:
-        createTableLike(ddlRequest.getCreate_table_like_params(), response);
-        break;
-      case CREATE_VIEW:
-        createView(ddlRequest.getCreate_view_params(), response);
-        break;
-      case CREATE_FUNCTION:
-        createFunction(ddlRequest.getCreate_fn_params(), response);
-        break;
-      case CREATE_DATA_SOURCE:
-        createDataSource(ddlRequest.getCreate_data_source_params(), response);
-        break;
-      case COMPUTE_STATS:
-        Preconditions.checkState(false, "Compute stats should trigger an ALTER TABLE.");
-        break;
-      case DROP_STATS:
-        dropStats(ddlRequest.getDrop_stats_params(), response);
-        break;
-      case DROP_DATABASE:
-        dropDatabase(ddlRequest.getDrop_db_params(), response);
-        break;
-      case DROP_TABLE:
-      case DROP_VIEW:
-        dropTableOrView(ddlRequest.getDrop_table_or_view_params(), response);
-        break;
-      case TRUNCATE_TABLE:
-        truncateTable(ddlRequest.getTruncate_params(), response);
-        break;
-      case DROP_FUNCTION:
-        dropFunction(ddlRequest.getDrop_fn_params(), response);
-        break;
-      case DROP_DATA_SOURCE:
-        dropDataSource(ddlRequest.getDrop_data_source_params(), response);
-        break;
-      case CREATE_ROLE:
-      case DROP_ROLE:
-        createDropRole(requestingUser, ddlRequest.getCreate_drop_role_params(),
-            response);
-        break;
-      case GRANT_ROLE:
-      case REVOKE_ROLE:
-        grantRevokeRoleGroup(requestingUser, ddlRequest.getGrant_revoke_role_params(),
-            response);
-        break;
-      case GRANT_PRIVILEGE:
-      case REVOKE_PRIVILEGE:
-        grantRevokeRolePrivilege(requestingUser,
-            ddlRequest.getGrant_revoke_priv_params(), response);
-        break;
-      default: throw new IllegalStateException("Unexpected DDL exec request type: " +
-          ddlRequest.ddl_type);
-    }
-
-    // For responses that contain updates to catalog objects, check that the response
-    // either exclusively uses the single updated/removed field or the corresponding list
-    // versions of the fields, but not a mix.
-    // The non-list version of the fields are maintained for backwards compatibility,
-    // e.g., BDR relies on a stable catalog API.
-    TCatalogUpdateResult result = response.getResult();
-    Preconditions.checkState(!
-        ((result.isSetUpdated_catalog_object_DEPRECATED()
-        || result.isSetRemoved_catalog_object_DEPRECATED())
-        &&
-        (result.isSetUpdated_catalog_objects()
-        || result.isSetRemoved_catalog_objects())));
-
-    // At this point, the operation is considered successful. If any errors occurred
-    // during execution, this function will throw an exception and the CatalogServer
-    // will handle setting a bad status code.
-    response.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
-    return response;
-  }
-
-  /**
-   * Execute the ALTER TABLE command according to the TAlterTableParams and refresh the
-   * table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is
-   * thread-safe, i.e. concurrent operations on the same table are serialized.
-   */
-  private void alterTable(TAlterTableParams params, TDdlExecResponse response)
-      throws ImpalaException {
-    // When true, loads the file/block metadata.
-    boolean reloadFileMetadata = false;
-    // When true, loads the table schema and the column stats from the Hive Metastore.
-    boolean reloadTableSchema = false;
-
-    TableName tableName = TableName.fromThrift(params.getTable_name());
-    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
-    catalog_.getLock().writeLock().lock();
-    synchronized (tbl) {
-      if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
-          || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
-        // RENAME is implemented as an ADD + DROP, so we need to execute it as we hold
-        // the catalog lock.
-        try {
-          alterTableOrViewRename(tbl,
-              TableName.fromThrift(params.getRename_params().getNew_table_name()),
-              response);
-          return;
-        } finally {
-          catalog_.getLock().writeLock().unlock();
-        }
-      }
-      // Get a new catalog version to assign to the table being altered.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      catalog_.getLock().writeLock().unlock();
-      switch (params.getAlter_type()) {
-        case ADD_REPLACE_COLUMNS:
-          TAlterTableAddReplaceColsParams addReplaceColParams =
-              params.getAdd_replace_cols_params();
-          alterTableAddReplaceCols(tbl, addReplaceColParams.getColumns(),
-              addReplaceColParams.isReplace_existing_cols());
-          reloadTableSchema = true;
-          break;
-        case ADD_PARTITION:
-          TAlterTableAddPartitionParams addPartParams = params.getAdd_partition_params();
-          // Create and add HdfsPartition object to the corresponding HdfsTable and load
-          // its block metadata. Get the new table object with an updated catalog
-          // version. If the partition already exists in Hive and "IfNotExists" is true,
-          // then return without populating the response object.
-          Table refreshedTable = alterTableAddPartition(tbl,
-              addPartParams.getPartition_spec(), addPartParams.isIf_not_exists(),
-              addPartParams.getLocation(), addPartParams.getCache_op());
-          if (refreshedTable != null) {
-            refreshedTable.setCatalogVersion(newCatalogVersion);
-            addTableToCatalogUpdate(refreshedTable, response.result);
-          }
-          return;
-        case DROP_COLUMN:
-          TAlterTableDropColParams dropColParams = params.getDrop_col_params();
-          alterTableDropCol(tbl, dropColParams.getCol_name());
-          reloadTableSchema = true;
-          break;
-        case CHANGE_COLUMN:
-          TAlterTableChangeColParams changeColParams = params.getChange_col_params();
-          alterTableChangeCol(tbl, changeColParams.getCol_name(),
-              changeColParams.getNew_col_def());
-          reloadTableSchema = true;
-          break;
-        case DROP_PARTITION:
-          TAlterTableDropPartitionParams dropPartParams =
-              params.getDrop_partition_params();
-          // Drop the partition from the corresponding table. Get the table object
-          // with an updated catalog version. If the partition does not exist and
-          // "IfExists" is true, then return without populating the response object.
-          // If "purge" option is specified partition data is purged by skipping
-          // Trash, if configured.
-          refreshedTable = alterTableDropPartition(tbl,
-              dropPartParams.getPartition_spec(),
-              dropPartParams.isIf_exists(), dropPartParams.isPurge());
-          if (refreshedTable != null) {
-            refreshedTable.setCatalogVersion(newCatalogVersion);
-            addTableToCatalogUpdate(refreshedTable, response.result);
-          }
-          return;
-        case RENAME_TABLE:
-        case RENAME_VIEW:
-          Preconditions.checkState(false,
-              "RENAME TABLE/VIEW operation has been processed");
-          return;
-        case SET_FILE_FORMAT:
-          TAlterTableSetFileFormatParams fileFormatParams =
-              params.getSet_file_format_params();
-          List<TPartitionKeyValue> fileFormatPartitionSpec = null;
-          if (fileFormatParams.isSetPartition_spec()) {
-            fileFormatPartitionSpec = fileFormatParams.getPartition_spec();
-          }
-          reloadFileMetadata = alterTableSetFileFormat(tbl, fileFormatPartitionSpec,
-              fileFormatParams.getFile_format());
-          break;
-        case SET_LOCATION:
-          TAlterTableSetLocationParams setLocationParams =
-              params.getSet_location_params();
-          List<TPartitionKeyValue> partitionSpec = null;
-          if (setLocationParams.isSetPartition_spec()) {
-            partitionSpec = setLocationParams.getPartition_spec();
-          }
-          reloadFileMetadata = alterTableSetLocation(tbl, partitionSpec,
-              setLocationParams.getLocation());
-          break;
-        case SET_TBL_PROPERTIES:
-          alterTableSetTblProperties(tbl, params.getSet_tbl_properties_params());
-          break;
-        case UPDATE_STATS:
-          Preconditions.checkState(params.isSetUpdate_stats_params());
-          alterTableUpdateStats(tbl, params.getUpdate_stats_params(), response);
-          reloadTableSchema = true;
-          break;
-        case SET_CACHED:
-          Preconditions.checkState(params.isSetSet_cached_params());
-          if (params.getSet_cached_params().getPartition_spec() == null) {
-            reloadFileMetadata = alterTableSetCached(tbl, params.getSet_cached_params());
-          } else {
-            alterPartitionSetCached(tbl, params.getSet_cached_params());
-          }
-          break;
-        case RECOVER_PARTITIONS:
-          alterTableRecoverPartitions(tbl);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unknown ALTER TABLE operation type: " + params.getAlter_type());
-      }
-
-      loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, reloadTableSchema,
-          null);
-      addTableToCatalogUpdate(tbl, response.result);
-    } // end of synchronized block
-  }
-
-  /**
-   * Loads the metadata of a table 'tbl' and assigns a new catalog version.
-   * reloadFileMetadata', 'reloadTableSchema', and 'partitionsToUpdate'
-   * are used only for HdfsTables and control which metadata to reload.
-   * Throws a CatalogException if there is an error loading table metadata.
-   */
-  private void loadTableMetadata(Table tbl, long newCatalogVersion,
-      boolean reloadFileMetadata, boolean reloadTableSchema,
-      Set<String> partitionsToUpdate) throws CatalogException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          getMetaStoreTable(msClient, tbl);
-      if (tbl instanceof HdfsTable) {
-        ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
-            reloadFileMetadata, reloadTableSchema, partitionsToUpdate);
-      } else {
-        tbl.load(true, msClient.getHiveClient(), msTbl);
-      }
-    }
-    tbl.setCatalogVersion(newCatalogVersion);
-  }
-
-  /**
-   * Serializes and adds table 'tbl' to a TCatalogUpdateResult object. Uses the
-   * version of the serialized table as the version of the catalog update result.
-   */
-  private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult result) {
-    TCatalogObject updatedCatalogObject = TableToTCatalogObject(tbl);
-    result.setUpdated_catalog_object_DEPRECATED(TableToTCatalogObject(tbl));
-    result.setVersion(updatedCatalogObject.getCatalog_version());
-  }
-
-  /**
-   * Creates a new HdfsPartition object and adds it to the corresponding HdfsTable.
-   * Does not create the object in the Hive metastore.
-   */
-  private Table addHdfsPartition(Table tbl, Partition partition)
-      throws CatalogException {
-    Preconditions.checkNotNull(tbl);
-    Preconditions.checkNotNull(partition);
-    if (!(tbl instanceof HdfsTable)) {
-      throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
-    }
-    HdfsTable hdfsTable = (HdfsTable) tbl;
-    HdfsPartition hdfsPartition = hdfsTable.createPartition(partition.getSd(), partition);
-    return catalog_.addPartition(hdfsPartition);
-  }
-
-  /**
-   * Alters an existing view's definition in the metastore. Throws an exception
-   * if the view does not exist or if the existing metadata entry is
-   * a table instead of a a view.
-   */
-   private void alterView(TCreateOrAlterViewParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    TableName tableName = TableName.fromThrift(params.getView_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    Preconditions.checkState(params.getColumns() != null &&
-        params.getColumns().size() > 0,
-          "Null or empty column list given as argument to DdlExecutor.alterView");
-    Table tbl = catalog_.getTable(tableName.getDb(), tableName.getTbl());
-    Preconditions.checkState(tbl instanceof View);
-    catalog_.getLock().writeLock().lock();
-    synchronized(tbl) {
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      catalog_.getLock().writeLock().unlock();
-      // Operate on a copy of the metastore table to avoid prematurely applying the
-      // alteration to our cached table in case the actual alteration fails.
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          tbl.getMetaStoreTable().deepCopy();
-      if (!msTbl.getTableType().equalsIgnoreCase((TableType.VIRTUAL_VIEW.toString()))) {
-        throw new ImpalaRuntimeException(
-            String.format("ALTER VIEW not allowed on a table: %s",
-                tableName.toString()));
-      }
-
-      // Set the altered view attributes and update the metastore.
-      setViewAttributes(params, msTbl);
-      LOG.debug(String.format("Altering view %s", tableName));
-      applyAlterTable(msTbl);
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        tbl.load(true, msClient.getHiveClient(), msTbl);
-      }
-      tbl.setCatalogVersion(newCatalogVersion);
-      addTableToCatalogUpdate(tbl, resp.result);
-    }
-  }
-
-  /**
-   * Alters an existing table's table and/or column statistics. Partitions are updated
-   * in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'.
-   */
-  private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params,
-      TDdlExecResponse resp) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(table));
-    if (params.isSetTable_stats()) {
-      // Updating table and column stats via COMPUTE STATS.
-      Preconditions.checkState(
-          params.isSetPartition_stats() && params.isSetTable_stats());
-    } else {
-      // Only changing column stats via ALTER TABLE SET COLUMN STATS.
-      Preconditions.checkState(params.isSetColumn_stats());
-    }
-
-    TableName tableName = table.getTableName();
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    LOG.info(String.format("Updating table stats for: %s", tableName));
-
-    // Deep copy the msTbl to avoid updating our cache before successfully persisting
-    // the results to the metastore.
-    org.apache.hadoop.hive.metastore.api.Table msTbl =
-        table.getMetaStoreTable().deepCopy();
-    List<HdfsPartition> partitions = Lists.newArrayList();
-    if (table instanceof HdfsTable) {
-      // Build a list of non-default partitions to update.
-      HdfsTable hdfsTable = (HdfsTable) table;
-      for (HdfsPartition p: hdfsTable.getPartitions()) {
-        if (!p.isDefaultPartition()) partitions.add(p);
-      }
-    }
-
-    int numTargetedPartitions = 0;
-    int numUpdatedColumns = 0;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Update the table and partition row counts based on the query results.
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
-      if (params.isSetTable_stats()) {
-        numTargetedPartitions = updateTableStats(table, params, msTbl, partitions,
-            modifiedParts);
-      }
-
-      ColumnStatistics colStats = null;
-      if (params.isSetColumn_stats()) {
-        // Create Hive column stats from the query results.
-        colStats = createHiveColStats(params.getColumn_stats(), table);
-        numUpdatedColumns = colStats.getStatsObjSize();
-      }
-
-      // Update all partitions.
-      bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts);
-      if (numUpdatedColumns > 0) {
-        Preconditions.checkNotNull(colStats);
-        // Update column stats.
-        try {
-          msClient.getHiveClient().updateTableColumnStatistics(colStats);
-        } catch (Exception e) {
-          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
-                  "updateTableColumnStatistics"), e);
-        }
-      }
-      // Update the table stats. Apply the table alteration last to ensure the
-      // lastDdlTime is as accurate as possible.
-      applyAlterTable(msTbl);
-    }
-
-    // Set the results to be reported to the client.
-    TResultSet resultSet = new TResultSet();
-    resultSet.setSchema(new TResultSetMetadata(Lists.newArrayList(
-        new TColumn("summary", Type.STRING.toThrift()))));
-    TColumnValue resultColVal = new TColumnValue();
-    resultColVal.setString_val("Updated " + numTargetedPartitions + " partition(s) and " +
-        numUpdatedColumns + " column(s).");
-    TResultRow resultRow = new TResultRow();
-    resultRow.setColVals(Lists.newArrayList(resultColVal));
-    resultSet.setRows(Lists.newArrayList(resultRow));
-    resp.setResult_set(resultSet);
-  }
-
-  /**
-   * Updates the row counts of the given Hive partitions and the total row count of the
-   * given Hive table based on the given update stats parameters. The partitions whose
-   * row counts have not changed are skipped. The modified partitions are returned
-   * in the modifiedParts parameter.
-   * Row counts for missing or new partitions as a result of concurrent table
-   * alterations are set to 0.
-   * Returns the number of partitions that were targeted for update (includes partitions
-   * whose row counts have not changed).
-   */
-  private int updateTableStats(Table table, TAlterTableUpdateStatsParams params,
-      org.apache.hadoop.hive.metastore.api.Table msTbl,
-      List<HdfsPartition> partitions, List<HdfsPartition> modifiedParts)
-      throws ImpalaException {
-    Preconditions.checkState(params.isSetPartition_stats());
-    Preconditions.checkState(params.isSetTable_stats());
-    // Update the partitions' ROW_COUNT parameter.
-    int numTargetedPartitions = 0;
-    for (HdfsPartition partition: partitions) {
-      // NULL keys are returned as 'NULL' in the partition_stats map, so don't substitute
-      // this partition's keys with Hive's replacement value.
-      List<String> partitionValues = partition.getPartitionValuesAsStrings(false);
-      TPartitionStats partitionStats = params.partition_stats.get(partitionValues);
-      if (partitionStats == null) {
-        // No stats were collected for this partition. This means that it was not included
-        // in the original computation statements. If the backend does not find any rows
-        // for a partition that should be included, it will generate an empty
-        // TPartitionStats object.
-        if (params.expect_all_partitions == false) continue;
-
-        // If all partitions are expected, fill in any missing stats with an empty entry.
-        partitionStats = new TPartitionStats();
-        if (params.is_incremental) {
-          partitionStats.intermediate_col_stats = Maps.newHashMap();
-        }
-        partitionStats.stats = new TTableStats();
-        partitionStats.stats.setNum_rows(0L);
-      }
-
-      // Unconditionally update the partition stats and row count, even if the partition
-      // already has identical ones. This behavior results in possibly redundant work,
-      // but it is predictable and easy to reason about because it does not depend on the
-      // existing state of the metadata. See IMPALA-2201.
-      long numRows = partitionStats.stats.num_rows;
-      LOG.debug(String.format("Updating stats for partition %s: numRows=%s",
-          partition.getValuesAsString(), numRows));
-      PartitionStatsUtil.partStatsToParameters(partitionStats, partition);
-      partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
-      partition.putToParameters(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK,
-          StatsSetupConst.TRUE);
-      ++numTargetedPartitions;
-      modifiedParts.add(partition);
-    }
-
-    // For unpartitioned tables and HBase tables report a single updated partition.
-    if (table.getNumClusteringCols() == 0 || table instanceof HBaseTable) {
-      numTargetedPartitions = 1;
-      if (table instanceof HdfsTable) {
-        Preconditions.checkState(modifiedParts.size() == 1);
-        // Delete stats for this partition as they are included in table stats.
-        PartitionStatsUtil.deletePartStats(modifiedParts.get(0));
-      }
-    }
-
-    // Update the table's ROW_COUNT parameter.
-    msTbl.putToParameters(StatsSetupConst.ROW_COUNT,
-        String.valueOf(params.getTable_stats().num_rows));
-    msTbl.putToParameters(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK,
-        StatsSetupConst.TRUE);
-    return numTargetedPartitions;
-  }
-
-  /**
-   * Create Hive column statistics for the given table based on the give map from column
-   * name to column stats. Missing or new columns as a result of concurrent table
-   * alterations are ignored.
-   */
-  private static ColumnStatistics createHiveColStats(
-      Map<String, TColumnStats> columnStats, Table table) {
-    // Collection of column statistics objects to be returned.
-    ColumnStatistics colStats = new ColumnStatistics();
-    colStats.setStatsDesc(
-        new ColumnStatisticsDesc(true, table.getDb().getName(), table.getName()));
-    // Generate Hive column stats objects from the update stats params.
-    for (Map.Entry<String, TColumnStats> entry: columnStats.entrySet()) {
-      String colName = entry.getKey();
-      Column tableCol = table.getColumn(entry.getKey());
-      // Ignore columns that were dropped in the meantime.
-      if (tableCol == null) continue;
-      ColumnStatisticsData colStatsData =
-          createHiveColStatsData(entry.getValue(), tableCol.getType());
-      if (colStatsData == null) continue;
-      LOG.debug(String.format("Updating column stats for %s: numDVs=%s numNulls=%s " +
-          "maxSize=%s avgSize=%s", colName, entry.getValue().getNum_distinct_values(),
-          entry.getValue().getNum_nulls(), entry.getValue().getMax_size(),
-          entry.getValue().getAvg_size()));
-      ColumnStatisticsObj colStatsObj = new ColumnStatisticsObj(colName,
-          tableCol.getType().toString(), colStatsData);
-      colStats.addToStatsObj(colStatsObj);
-    }
-    return colStats;
-  }
-
-  private static ColumnStatisticsData createHiveColStatsData(TColumnStats colStats,
-      Type colType) {
-    ColumnStatisticsData colStatsData = new ColumnStatisticsData();
-    long ndvs = colStats.getNum_distinct_values();
-    long numNulls = colStats.getNum_nulls();
-    switch(colType.getPrimitiveType()) {
-      case BOOLEAN:
-        // TODO: Gather and set the numTrues and numFalse stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls));
-        break;
-      case TINYINT:
-      case SMALLINT:
-      case INT:
-      case BIGINT:
-      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndvs));
-        break;
-      case FLOAT:
-      case DOUBLE:
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndvs));
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        long maxStrLen = colStats.getMax_size();
-        double avgStrLen = colStats.getAvg_size();
-        colStatsData.setStringStats(
-            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndvs));
-        break;
-      case DECIMAL:
-        // TODO: Gather and set the min/max values stats as well. The planner
-        // currently does not rely on them.
-        colStatsData.setDecimalStats(
-            new DecimalColumnStatsData(numNulls, ndvs));
-        break;
-      default:
-        return null;
-    }
-    return colStatsData;
-  }
-
-  /**
-   * Creates a new database in the metastore and adds the db name to the internal
-   * metadata cache, marking its metadata to be lazily loaded on the next access.
-   * Re-throws any Hive Meta Store exceptions encountered during the create, these
-   * may vary depending on the Meta Store connection type (thrift vs direct db).
-   */
-  private void createDatabase(TCreateDbParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    Preconditions.checkNotNull(params);
-    String dbName = params.getDb();
-    Preconditions.checkState(dbName != null && !dbName.isEmpty(),
-        "Null or empty database name passed as argument to Catalog.createDatabase");
-    if (params.if_not_exists && catalog_.getDb(dbName) != null) {
-      LOG.debug("Skipping database creation because " + dbName + " already exists and " +
-          "IF NOT EXISTS was specified.");
-      resp.getResult().setVersion(catalog_.getCatalogVersion());
-      return;
-    }
-    org.apache.hadoop.hive.metastore.api.Database db =
-        new org.apache.hadoop.hive.metastore.api.Database();
-    db.setName(dbName);
-    if (params.getComment() != null) {
-      db.setDescription(params.getComment());
-    }
-    if (params.getLocation() != null) {
-      db.setLocationUri(params.getLocation());
-    }
-    LOG.debug("Creating database " + dbName);
-    Db newDb = null;
-    synchronized (metastoreDdlLock_) {
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        try {
-          msClient.getHiveClient().createDatabase(db);
-          newDb = catalog_.addDb(dbName, db);
-        } catch (AlreadyExistsException e) {
-          if (!params.if_not_exists) {
-            throw new ImpalaRuntimeException(
-                String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
-          }
-          LOG.debug(String.format("Ignoring '%s' when creating database %s because " +
-              "IF NOT EXISTS was specified.", e, dbName));
-          newDb = catalog_.getDb(dbName);
-          if (newDb == null) {
-            try {
-              org.apache.hadoop.hive.metastore.api.Database msDb =
-                  msClient.getHiveClient().getDatabase(dbName);
-              newDb = catalog_.addDb(dbName, msDb);
-            } catch (TException e1) {
-              throw new ImpalaRuntimeException(
-                  String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1);
-            }
-          }
-        } catch (TException e) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
-        }
-      }
-
-      Preconditions.checkNotNull(newDb);
-      TCatalogObject thriftDb = new TCatalogObject(
-          TCatalogObjectType.DATABASE, Catalog.INITIAL_CATALOG_VERSION);
-      thriftDb.setDb(newDb.toThrift());
-      thriftDb.setCatalog_version(newDb.getCatalogVersion());
-      resp.result.setUpdated_catalog_object_DEPRECATED(thriftDb);
-    }
-    resp.result.setVersion(
-        resp.result.getUpdated_catalog_object_DEPRECATED().getCatalog_version());
-  }
-
-  private TCatalogObject buildTCatalogFnObject(Function fn) {
-    TCatalogObject result = new TCatalogObject();
-    result.setType(TCatalogObjectType.FUNCTION);
-    result.setFn(fn.toThrift());
-    result.setCatalog_version(fn.getCatalogVersion());
-    return result;
-  }
-
- private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    Function fn = Function.fromThrift(params.getFn());
-    LOG.debug(String.format("Adding %s: %s",
-        fn.getClass().getSimpleName(), fn.signatureString()));
-    boolean isPersistentJavaFn =
-        (fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fn.dbName());
-      if (db == null) {
-        throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
-      }
-      // Search for existing functions with the same name or signature that would
-      // conflict with the function being added.
-      for (Function function: db.getFunctions(fn.functionName())) {
-        if (isPersistentJavaFn || (function.isPersistent() &&
-            (function.getBinaryType() == TFunctionBinaryType.JAVA)) ||
-                function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
-          if (!params.if_not_exists) {
-            throw new CatalogException("Function " + fn.functionName() +
-                " already exists.");
-          }
-          return;
-        }
-      }
-
-      List<TCatalogObject> addedFunctions = Lists.newArrayList();
-      if (isPersistentJavaFn) {
-        // For persistent Java functions we extract all supported function signatures from
-        // the corresponding Jar and add each signature to the catalog.
-        Preconditions.checkState(fn instanceof ScalarFunction);
-        org.apache.hadoop.hive.metastore.api.Function hiveFn =
-            ((ScalarFunction)fn).toHiveFunction();
-        List<Function> funcs = CatalogServiceCatalog.extractFunctions(fn.dbName(), hiveFn);
-        if (funcs.isEmpty()) {
-          throw new CatalogException(
-            "No compatible function signatures found in class: " + hiveFn.getClassName());
-        }
-        if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
-          LOG.info("Funcs size:" + funcs.size());
-          for (Function addedFn: funcs) {
-            LOG.info(String.format("Adding function: %s.%s", addedFn.dbName(),
-                addedFn.signatureString()));
-            Preconditions.checkState(catalog_.addFunction(addedFn));
-            addedFunctions.add(buildTCatalogFnObject(addedFn));
-          }
-        }
-      } else {
-        if (catalog_.addFunction(fn)) {
-          // Flush DB changes to metastore
-          applyAlterDatabase(catalog_.getDb(fn.dbName()));
-          addedFunctions.add(buildTCatalogFnObject(fn));
-        }
-      }
-
-      if (!addedFunctions.isEmpty()) {
-        // Distinguish which result field to set based on the type of function being
-        // added for backwards compatibility. For example, BDR relies on a stable
-        // catalog Thrift API.
-        if (isPersistentJavaFn) {
-          // Only persistent Java UDFs can update multiple catalog objects.
-          resp.result.setUpdated_catalog_objects(addedFunctions);
-        } else {
-          Preconditions.checkState(addedFunctions.size() == 1);
-          resp.result.setUpdated_catalog_object_DEPRECATED(addedFunctions.get(0));
-        }
-        resp.result.setVersion(catalog_.getCatalogVersion());
-      }
-    }
-  }
-
-  private void createDataSource(TCreateDataSourceParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    if (LOG.isDebugEnabled()) { LOG.debug("Adding DATA SOURCE: " + params.toString()); }
-    DataSource dataSource = DataSource.fromThrift(params.getData_source());
-    if (catalog_.getDataSource(dataSource.getName()) != null) {
-      if (!params.if_not_exists) {
-        throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
-            " already exists.");
-      }
-      // The user specified IF NOT EXISTS and the data source exists, just
-      // return the current catalog version.
-      resp.result.setVersion(catalog_.getCatalogVersion());
-      return;
-    }
-    catalog_.addDataSource(dataSource);
-    TCatalogObject addedObject = new TCatalogObject();
-    addedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    addedObject.setData_source(dataSource.toThrift());
-    addedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.setUpdated_catalog_object_DEPRECATED(addedObject);
-    resp.result.setVersion(dataSource.getCatalogVersion());
-  }
-
-  private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    if (LOG.isDebugEnabled()) { LOG.debug("Drop DATA SOURCE: " + params.toString()); }
-    DataSource dataSource = catalog_.getDataSource(params.getData_source());
-    if (dataSource == null) {
-      if (!params.if_exists) {
-        throw new ImpalaRuntimeException("Data source " + params.getData_source() +
-            " does not exists.");
-      }
-      // The user specified IF EXISTS and the data source didn't exist, just
-      // return the current catalog version.
-      resp.result.setVersion(catalog_.getCatalogVersion());
-      return;
-    }
-    catalog_.removeDataSource(params.getData_source());
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    removedObject.setData_source(dataSource.toThrift());
-    removedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-    resp.result.setVersion(dataSource.getCatalogVersion());
-  }
-
-  /**
-   * Drops all table and column stats from the target table in the HMS and
-   * updates the Impala catalog. Throws an ImpalaException if any errors are
-   * encountered as part of this operation. Acquires a lock on the modified table
-   * to protect against concurrent modifications.
-   */
-  private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    Table table = getExistingTable(params.getTable_name().getDb_name(),
-        params.getTable_name().getTable_name());
-    Preconditions.checkNotNull(table);
-    catalog_.getLock().writeLock().lock();
-    synchronized(table) {
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      catalog_.getLock().writeLock().unlock();
-      if (params.getPartition_spec() == null) {
-        // TODO: Report the number of updated partitions/columns to the user?
-        dropColumnStats(table);
-        dropTableStats(table);
-      } else {
-        HdfsPartition partition =
-            ((HdfsTable)table).getPartitionFromThriftPartitionSpec(
-                params.getPartition_spec());
-        if (partition == null) {
-          List<String> partitionDescription = Lists.newArrayList();
-          for (TPartitionKeyValue v: params.getPartition_spec()) {
-            partitionDescription.add(v.name + " = " + v.value);
-          }
-          throw new ImpalaRuntimeException("Could not find partition: " +
-              Joiner.on("/").join(partitionDescription));
-        }
-
-        if (partition.getPartitionStats() != null)  {
-          PartitionStatsUtil.deletePartStats(partition);
-          try {
-            applyAlterPartition(table, partition);
-          } finally {
-            partition.markDirty();
-          }
-        }
-      }
-
-      loadTableMetadata(table, newCatalogVersion, false, true, null);
-      addTableToCatalogUpdate(table, resp.result);
-    } // end of synchronization
-  }
-
-  /**
-   * Drops all column stats from the table in the HMS. Returns the number of columns
-   * that were updated as part of this operation.
-   */
-  private int dropColumnStats(Table table) throws ImpalaRuntimeException {
-    Preconditions.checkState(Thread.holdsLock(table));
-    int numColsUpdated = 0;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      for (Column col: table.getColumns()) {
-        // Skip columns that don't have stats.
-        if (!col.getStats().hasStats()) continue;
-
-        try {
-          msClient.getHiveClient().deleteTableColumnStatistics(
-              table.getDb().getName(), table.getName(), col.getName());
-          ++numColsUpdated;
-        } catch (NoSuchObjectException e) {
-          // We don't care if the column stats do not exist, just ignore the exception.
-          // We would only expect to make it here if the Impala and HMS metadata
-          // diverged.
-        } catch (TException e) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR,
-                  "delete_table_column_statistics"), e);
-        }
-      }
-    }
-    return numColsUpdated;
-  }
-
-  /**
-   * Drops all table and partition stats from this table in the HMS.
-   * Partitions are updated in batches of MAX_PARTITION_UPDATES_PER_RPC. Returns
-   * the number of partitions updated as part of this operation, or 1 if the table
-   * is unpartitioned.
-   */
-  private int dropTableStats(Table table) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(table));
-    // Delete the ROW_COUNT from the table (if it was set).
-    org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
-    int numTargetedPartitions = 0;
-    if (msTbl.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
-      applyAlterTable(msTbl);
-      ++numTargetedPartitions;
-    }
-
-    if (!(table instanceof HdfsTable) || table.getNumClusteringCols() == 0) {
-      // If this is not an HdfsTable or if the table is not partitioned, there
-      // is no more work to be done so just return.
-      return numTargetedPartitions;
-    }
-
-    // Now clear the stats for all partitions in the table.
-    HdfsTable hdfsTable = (HdfsTable) table;
-    Preconditions.checkNotNull(hdfsTable);
-
-    // List of partitions that were modified as part of this operation.
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
-    for (HdfsPartition part: hdfsTable.getPartitions()) {
-      boolean isModified = false;
-      // The default partition is an Impala-internal abstraction and is not
-      // represented in the Hive Metastore.
-      if (part.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
-        continue;
-      }
-      if (part.getPartitionStats() != null) {
-        PartitionStatsUtil.deletePartStats(part);
-        isModified = true;
-      }
-
-      // Remove the ROW_COUNT parameter if it has been set.
-      if (part.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
-        isModified = true;
-      }
-
-      if (isModified) modifiedParts.add(part);
-    }
-
-    bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts);
-    return modifiedParts.size();
-  }
-
-  /**
-   * Drops a database from the metastore and removes the database's metadata from the
-   * internal cache. Re-throws any Hive Meta Store exceptions encountered during
-   * the drop.
-   */
-  private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    Preconditions.checkNotNull(params);
-    Preconditions.checkState(params.getDb() != null && !params.getDb().isEmpty(),
-        "Null or empty database name passed as argument to Catalog.dropDatabase");
-
-    LOG.debug("Dropping database " + params.getDb());
-    Db db = catalog_.getDb(params.db);
-    if (db != null && db.numFunctions() > 0 && !params.cascade) {
-      throw new CatalogException("Database " + db.getName() + " is not empty");
-    }
-
-    TCatalogObject removedObject = new TCatalogObject();
-    synchronized (metastoreDdlLock_) {
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        msClient.getHiveClient().dropDatabase(
-            params.getDb(), true, params.if_exists, params.cascade);
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e);
-      }
-      Db removedDb = catalog_.removeDb(params.getDb());
-      // If no db was removed as part of this operation just return the current catalog
-      // version.
-      if (removedDb == null) {
-        removedObject.setCatalog_version(catalog_.getCatalogVersion());
-      } else {
-        removedObject.setCatalog_version(removedDb.getCatalogVersion());
-      }
-    }
-    removedObject.setType(TCatalogObjectType.DATABASE);
-    removedObject.setDb(new TDatabase());
-    removedObject.getDb().setDb_name(params.getDb());
-    resp.result.setVersion(removedObject.getCatalog_version());
-    resp.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-  }
-
-  /**
-   * Drops a table or view from the metastore and removes it from the catalog.
-   * Also drops all associated caching requests on the table and/or table's partitions,
-   * uncaching all table data. If params.purge is true, table data is permanently
-   * deleted.
-   */
-  private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    TableName tableName = TableName.fromThrift(params.getTable_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    LOG.debug(String.format("Dropping table/view %s", tableName));
-
-    TCatalogObject removedObject = new TCatalogObject();
-    synchronized (metastoreDdlLock_) {
-
-      // Forward the DDL operation to the specified storage backend.
-      try {
-        org.apache.hadoop.hive.metastore.api.Table msTbl = getExistingTable(
-            tableName.getDb(), tableName.getTbl()).getMetaStoreTable();
-        DdlDelegate handler = createDdlDelegate(msTbl);
-        handler.dropTable();
-      } catch (TableNotFoundException | DatabaseNotFoundException e) {
-        // Do nothing
-      }
-
-      Db db = catalog_.getDb(params.getTable_name().db_name);
-      if (db == null) {
-        if (params.if_exists) return;
-        throw new CatalogException("Database does not exist: " +
-            params.getTable_name().db_name);
-      }
-      Table existingTbl = db.getTable(params.getTable_name().table_name);
-      if (existingTbl == null) {
-        if (params.if_exists) return;
-        throw new CatalogException("Table/View does not exist: " + tableName);
-      }
-      // Check to make sure we don't drop a view with "drop table" statement and
-      // vice versa. is_table field is marked optional in TDropTableOrViewParams to
-      // maintain catalog api compatibility.
-      // TODO: Remove params.isSetIs_table() check once catalog api compatibility is
-      // fixed.
-      if (params.isSetIs_table() && ((params.is_table && existingTbl instanceof View)
-          || (!params.is_table && !(existingTbl instanceof View)))) {
-        if (params.if_exists) return;
-        String errorMsg = "DROP " + (params.is_table ? "TABLE " : "VIEW ") +
-            "not allowed on a " + (params.is_table ? "view: " : "table: ") + tableName;
-        throw new CatalogException(errorMsg);
-      }
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        msClient.getHiveClient().dropTable(
-            tableName.getDb(), tableName.getTbl(), true, params.if_exists, params.purge);
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e);
-      }
-
-      Table table = catalog_.removeTable(params.getTable_name().db_name,
-          params.getTable_name().table_name);
-      if (table != null) {
-        resp.result.setVersion(table.getCatalogVersion());
-        if (table instanceof HdfsTable) {
-          HdfsTable hdfsTable = (HdfsTable) table;
-          if (hdfsTable.isMarkedCached()) {
-            try {
-              HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable());
-            } catch (Exception e) {
-              LOG.error("Unable to uncache table: " + table.getFullName(), e);
-            }
-          }
-          if (table.getNumClusteringCols() > 0) {
-            for (HdfsPartition partition: hdfsTable.getPartitions()) {
-              if (partition.isMarkedCached()) {
-                try {
-                  HdfsCachingUtil.uncachePartition(partition);
-                } catch (Exception e) {
-                  LOG.error("Unable to uncache partition: " +
-                      partition.getPartitionName(), e);
-                }
-              }
-            }
-          }
-        }
-      } else {
-        resp.result.setVersion(catalog_.getCatalogVersion());
-      }
-    }
-    removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable());
-    removedObject.getTable().setTbl_name(tableName.getTbl());
-    removedObject.getTable().setDb_name(tableName.getDb());
-    removedObject.setCatalog_version(resp.result.getVersion());
-    resp.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-  }
-
-  /**
-   * Truncate a table by deleting all files in its partition directories, and dropping
-   * all column and table statistics. Acquires a table lock to protect against
-   * concurrent table modifications.
-   * TODO truncate specified partitions.
-   */
-  private void truncateTable(TTruncateParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    TTableName tblName = params.getTable_name();
-    Table table = null;
-    try {
-      table = getExistingTable(tblName.getDb_name(), tblName.getTable_name());
-    } catch (TableNotFoundException e) {
-      if (params.if_exists) return;
-      throw e;
-    }
-    Preconditions.checkNotNull(table);
-    if (!(table instanceof HdfsTable)) {
-      throw new CatalogException(
-          String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
-          table.getFullName()));
-    }
-    catalog_.getLock().writeLock().lock();
-    synchronized(table) {
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      catalog_.getLock().writeLock().unlock();
-      try {
-        HdfsTable hdfsTable = (HdfsTable)table;
-        for (HdfsPartition part: hdfsTable.getPartitions()) {
-          if (part.isDefaultPartition()) continue;
-          FileSystemUtil.deleteAllVisibleFiles(new Path(part.getLocation()));
-        }
-
-        dropColumnStats(table);
-        dropTableStats(table);
-      } catch (Exception e) {
-        String fqName = tblName.db_name + "." + tblName.table_name;
-        throw new CatalogException(String.format("Failed to truncate table: %s.\n" +
-            "Table may be in a partially truncated state.", fqName), e);
-      }
-
-      loadTableMetadata(table, newCatalogVersion, true, true, null);
-      addTableToCatalogUpdate(table, resp.result);
-    } // end synchronization
-  }
-
-  private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
-      throws ImpalaException {
-    FunctionName fName = FunctionName.fromThrift(params.fn_name);
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fName.getDb());
-      if (db == null) {
-        if (!params.if_exists) {
-            throw new CatalogException("Database: " + fName.getDb()
-                + " does not exist.");
-        }
-        return;
-      }
-      List<TCatalogObject> removedFunctions = Lists.newArrayList();
-      if (!params.isSetSignature()) {
-        dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
-        for (Function fn: db.getFunctions(fName.getFunction())) {
-          if (fn.getBinaryType() != TFunctionBinaryType.JAVA
-              || !fn.isPersistent()) {
-            continue;
-          }
-          Preconditions.checkNotNull(catalog_.removeFunction(fn));
-          removedFunctions.add(buildTCatalogFnObject(fn));
-        }
-      } else {
-        ArrayList<Type> argTypes = Lists.newArrayList();
-        for (TColumnType t: params.arg_types) {
-          argTypes.add(Type.fromThrift(t));
-        }
-        Function desc = new Function(fName, argTypes, Type.INVALID, false);
-        Function fn = catalog_.removeFunction(desc);
-        if (fn == null) {
-          if (!params.if_exists) {
-            throw new CatalogException(
-                "Function: " + desc.signatureString() + " does not exist.");
-          }
-        } else {
-          // Flush DB changes to metastore
-          applyAlterDatabase(catalog_.getDb(fn.dbName()));
-          removedFunctions.add(buildTCatalogFnObject(fn));
-        }
-      }
-
-      if (!removedFunctions.isEmpty()) {
-        // Distinguish which result field to set based on the type of functions removed
-        // for backwards compatibility. For example, BDR relies on a stable catalog
-        // Thrift API.
-        if (!params.isSetSignature()) {
-          // Removing all signatures of a persistent Java UDF.
-          resp.result.setRemoved_catalog_objects(removedFunctions);
-        } else {
-          Preconditions.checkState(removedFunctions.size() == 1);
-          resp.result.setRemoved_catalog_object_DEPRECATED(removedFunctions.get(0));
-        }
-      }
-      resp.result.setVersion(catalog_.getCatalogVersion());
-    }
-  }
-
-  /**
-   * Creates a new table in the metastore and adds an entry to the metadata cache to
-   * lazily load the new metadata on the next access. Re-throws any Hive Meta Store
-   * exceptions encountered during the create.
-   */
-  private boolean createTable(TCreateTableParams params, TDdlExecResponse response)
-      throws ImpalaException {
-    Preconditions.checkNotNull(params);
-    TableName tableName = TableName.fromThrift(params.getTable_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    Preconditions.checkState(params.getColumns() != null &&
-        params.getColumns().size() > 0,
-        "Null or empty column list given as argument to Catalog.createTable");
-
-    if (params.if_not_exists &&
-        catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
-      LOG.debug(String.format("Skipping table creation because %s already exists and " +
-          "IF NOT EXISTS was specified.", tableName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return false;
-    }
-    org.apache.hadoop.hive.metastore.api.Table tbl =
-        createMetaStoreTable(params);
-    LOG.debug(String.format("Creating table %s", tableName));
-    return createTable(tbl, params.if_not_exists, params.getCache_op(),
-        params.getDistribute_by(), response);
-  }
-
-  /**
-   * Creates a new view in the metastore and adds an entry to the metadata cache to
-   * lazily load the new metadata on the next access. Re-throws any Metastore
-   * exceptions encountered during the create.
-   */
-  private void createView(TCreateOrAlterViewParams params, TDdlExecResponse response)
-      throws ImpalaException {
-    TableName tableName = TableName.fromThrift(params.getView_name());
-    Preconditions.checkState(tableName != null && tableName.isFullyQualified());
-    Preconditions.checkState(params.getColumns() != null &&
-        params.getColumns().size() > 0,
-          "Null or empty column list given as argument to DdlExecutor.createView");
-    if (params.if_not_exists &&
-        catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
-      LOG.debug(String.format("Skipping view creation because %s already exists and " +
-          "ifNotExists is true.", tableName));
-    }
-
-    // Create new view.
-    org.apache.hadoop.hive.metastore.api.Table view =
-        new org.apache.hadoop.hive.metastore.api.Table();
-    setViewAttributes(params, view);
-    LOG.debug(String.format("Creating view %s", tableName));
-    createTable(view, params.if_not_exists, null, null, response);
-  }
-
-  /**
-   * Creates a new table in the metastore based on the definition of an existing table.
-   * No data is copied as part of this process, it is a metadata only operation. If the
-   * creation succeeds, an entry is added to the metadata cache to lazily load the new
-   * table's metadata on the next access.
-   */
-  private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response)
-      throws ImpalaException {
-    Preconditions.checkNotNull(params);
-
-    THdfsFileFormat fileFormat =
-        params.isSetFile_format() ? params.getFile_format() : null;
-    String comment = params.isSetComment() ? params.getComment() : null;
-    TableName tblName = TableName.fromThrift(params.getTable_name());
-    TableName srcTblName = TableName.fromThrift(params.getSrc_table_name());
-    Preconditions.checkState(tblName != null && tblName.isFullyQualified());
-    Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
-
-    if (params.if_not_exists &&
-        catalog_.containsTable(tblName.getDb(), tblName.getTbl())) {
-      LOG.debug(String.format("Skipping table creation because %s already exists and " +
-          "IF NOT EXISTS was specified.", tblName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return;
-    }
-    Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
-    org.apache.hadoop.hive.metastore.api.Table tbl =
-        srcTable.getMetaStoreTable().deepCopy();
-    tbl.setDbName(tblName.getDb());
-    tbl.setTableName(tblName.getTbl());
-    tbl.setOwner(params.getOwner());
-    if (tbl.getParameters() == null) {
-      tbl.setParameters(new HashMap<String, String>());
-    }
-    if (comment != null) {
-      tbl.getParameters().put("comment", comment);
-    }
-    // The EXTERNAL table property should not be copied from the old table.
-    if (params.is_external) {
-      tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
-      tbl.putToParameters("EXTERNAL", "TRUE");
-    } else {
-      tbl.setTableType(TableType.MANAGED_TABLE.toString());
-      if (tbl.getParameters().containsKey("EXTERNAL")) {
-        tbl.getParameters().remove("EXTERNAL");
-      }
-    }
-
-    // We should not propagate hdfs caching parameters to the new table.
-    if (tbl.getParameters().containsKey(
-        HdfsCachingUtil.CACHE_DIR_ID_PROP_NAME)) {
-      tbl.getParameters().remove(HdfsCachingUtil.CACHE_DIR_ID_PROP_NAME);
-    }
-    if (tbl.getParameters().containsKey(
-        HdfsCachingUtil.CACHE_DIR_REPLICATION_PROP_NAME)) {
-      tbl.getParameters().remove(
-        HdfsCachingUtil.CACHE_DIR_REPLICATION_PROP_NAME);
-    }
-
-    // The LOCATION property should not be copied from the old table. If the location
-    // is null (the caller didn't specify a custom location) this will clear the value
-    // and the table will use the default table location from the parent database.
-    tbl.getSd().setLocation(params.getLocation());
-    if (fileFormat != null) {
-      setStorageDescriptorFileFormat(tbl.getSd(), fileFormat);
-    } else if (fileFormat == null && srcTable instanceof View) {
-      // Here, source table is a view which has no input format. So to be
-      // consistent with CREATE TABLE, default input format is assumed to be
-      // TEXT unless otherwise specified.
-      setStorageDescriptorFileFormat(tbl.getSd(), THdfsFileFormat.TEXT);
-    }
-    // Set the row count of this table to unknown.
-    tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
-    LOG.debug(String.format("Creating table %s LIKE %s", tblName, srcTblName));
-    createTable(tbl, params.if_not_exists, null, null, response);
-  }
-
-  /**
-   * Creates a new table in the HMS. If ifNotExists=true, no error will be thrown if
-   * the table already exists, otherwise an exception will be thrown.
-   * Accepts an optional 'cacheOp' param, which if specified will cache the table's
-   * HDFS location according to the 'cacheOp' spec after creation.
-   * Stores details of the operations (such as the resulting catalog version) in
-   * 'response' output parameter.
-   * Returns true if a new table was created as part of this call, false otherwise.
-   */
-  private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      boolean ifNotExists, THdfsCachingOp cacheOp, List<TDistributeParam> distribute_by,
-      TDdlExecResponse response)
-      throws ImpalaException {
-    synchronized (metastoreDdlLock_) {
-
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        msClient.getHiveClient().createTable(newTable);
-        // If this table should be cached, and the table location was not specified by
-        // the user, an extra step is needed to read the table to find the location.
-        if (cacheOp != null && cacheOp.isSet_cached() &&
-            newTable.getSd().getLocation() == null) {
-          newTable = msClient.getHiveClient().getTable(newTable.getDbName(),
-              newTable.getTableName());
-        }
-      } catch (AlreadyExistsException e) {
-        if (!ifNotExists) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
-        }
-        LOG.debug(String.format("Ignoring '%s' when creating table %s.%s because " +
-            "IF NOT EXISTS was specified.", e,
-            newTable.getDbName(), newTable.getTableName()));
-        return false;
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
-      }
-
-      // Forward the operation to a specific storage backend. If the operation fails,
-      // delete the just created hive table to avoid inconsistencies.
-      try {
-        createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable();
-      } catch (ImpalaRuntimeException e) {
-        try (MetaStoreClient c = catalog_.getMetaStoreClient()) {
-          c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(),
-              false, ifNotExists);
-        } catch (Exception hE) {
-          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
-              "dropTable"), hE);
-        }
-        throw e;
-      }
-
-      // Submit the cache request and update the table metadata.
-      if (cacheOp != null && cacheOp.isSet_cached()) {
-        short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
-            JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
-        long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
-            cacheOp.getCache_pool_name(), replication);
-        catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
-            new TTableName(newTable.getDbName(), newTable.getTableName()));
-        applyAlterTable(newTable);
-      }
-      Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
-      addTableToCatalogUpdate(newTbl, response.result);
-    }
-    return true;
-  }
-
-  /**
-   * Instantiate the appropriate DDL delegate for the table. If no known delegate is
-   * available for the table, returns a UnsupportedOpDelegate instance.
-   */
-  private DdlDelegate createDdlDelegate(org.apache.hadoop.hive.metastore.api.Table tab) {
-    if (KuduDdlDelegate.canHandle(tab)) return new KuduDdlDelegate(tab);
-    return new UnsupportedOpDelegate();
-  }
-
-  /**
-   * Sets the given params in the metastore table as appropriate for a view.
-   */
-  private void setViewAttributes(TCreateOrAlterViewParams params,
-      org.apache.hadoop.hive.metastore.api.Table view) {
-    view.setTableType(TableType.VIRTUAL_VIEW.toString());
-    view.setViewOriginalText(params.getOriginal_view_def());
-    view.setViewExpandedText(params.getExpanded_view_def());
-    view.setDbName(params.getView_name().getDb_name());
-    view.setTableName(params.getView_name().getTable_name());
-    view.setOwner(params.getOwner());
-    if (view.getParameters() == null) view.setParameters(new HashMap<String, String>());
-    if (params.isSetComment() &&  params.getComment() != null) {
-      view.getParameters().put("comment", params.getComment());
-    }
-
-    // Add all the columns to a new storage descriptor.
-    StorageDescriptor sd = new StorageDescriptor();
-    sd.setCols(buildFieldSchemaList(params.getColumns()));
-    // Set a dummy SerdeInfo for Hive.
-    sd.setSerdeInfo(new SerDeInfo());
-    view.setSd(sd);
-  }
-
-  /**
-   * Appends one or more columns to the given table, optionally replacing all existing
-   * columns.
-   */
-  private void alterTableAddReplaceCols(Table tbl, List<TColumn> columns,
-      boolean replaceExistingCols) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
-    List<FieldSchema> newColumns = buildFieldSchemaList(columns);
-    if (replaceExistingCols) {
-      msTbl.getSd().setCols(newColumns);
-    } else {
-      // Append the new column to the existing list of columns.
-      for (FieldSchema fs: buildFieldSchemaList(columns)) {
-        msTbl.getSd().addToCols(fs);
-      }
-    }
-    applyAlterTable(msTbl);
-  }
-
-  /**
-   * Changes the column definition of an existing column. This can be used to rename a
-   * column, add a comment to a column, or change the datatype of a column.
-   */
-  private void alterTableChangeCol(Table tbl, String colName,
-      TColumn newCol) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
-    // Find the matching column name and change it.
-    Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
-    while (iterator.hasNext()) {
-      FieldSchema fs = iterator.next();
-      if (fs.getName().toLowerCase().equals(colName.toLowerCase())) {
-        fs.setName(newCol.getColumnName());
-        Type type = Type.fromThrift(newCol.getColumnType());
-        fs.setType(type.toSql().toLowerCase());
-        // Don't overwrite the existing comment unless a new comment is given
-        if (newCol.getComment() != null) {
-          fs.setComment(newCol.getComment());
-        }
-        break;
-      }
-      if (!iterator.hasNext()) {
-        throw new ColumnNotFoundException(String.format(
-            "Column name %s not found in table %s.", colName, tbl.getFullName()));
-      }
-    }
-    applyAlterTable(msTbl);
-  }
-
-  /**
-   * Adds a new partition to the given table in Hive. Also creates and adds
-   * a new HdfsPartition to the corresponding HdfsTable.
-   * If cacheOp is not null, the partition's location will be cached according
-   * to the cacheOp. If cacheOp is null, the new partition will inherit the
-   * the caching properties of the parent table.
-   * Returns null if the partition already exists in Hive and "IfNotExists"
-   * is true. Otherwise, returns the table object with an updated catalog version.
-   */
-  private Table alterTableAddPartition(Table tbl, List<TPartitionKeyValue> partitionSpec,
-      boolean ifNotExists, String location, THdfsCachingOp cacheOp)
-      throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    TableName tableName = tbl.getTableName();
-    if (ifNotExists && catalog_.containsHdfsPartition(tableName.getDb(),
-        tableName.getTbl(), partitionSpec)) {
-      LOG.debug(String.format("Skipping partition creation because (%s) already exists" +
-          " and ifNotExists is true.", Joiner.on(", ").join(partitionSpec)));
-      return null;
-    }
-
-    org.apache.hadoop.hive.metastore.api.Partition partition = null;
-    Table result = null;
-    List<Long> cacheIds = null;
-    org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
-    Long parentTblCacheDirId =
-        HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
-
-    partition = createHmsPartition(partitionSpec, msTbl, tableName, location);
-
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Add the new partition.
-      partition = msClient.getHiveClient().add_partition(partition);
-      String cachePoolName = null;
-      Short replication = null;
-      if (cacheOp == null && parentTblCacheDirId != null) {
-        // The user didn't specify an explicit caching operation, inherit the value
-        // from the parent table.
-        cachePoolName = HdfsCachingUtil.getCachePool(parentTblCacheDirId);
-        Preconditions.checkNotNull(cachePoolName);
-        replication = HdfsCachingUtil.getCacheReplication(parentTblCacheDirId);
-        Preconditions.checkNotNull(replication);
-      } else if (cacheOp != null && cacheOp.isSet_cached()) {
-        // The user explicitly stated that this partition should be cached.
-        cachePoolName = cacheOp.getCache_pool_name();
-
-        // When the new partition should be cached and and no replication factor
-        // was specified, inherit the replication factor from the parent table if
-        // it is cached. If the parent is not cached and no replication factor is
-        // explicitly set, use the default value.
-        if (!cacheOp.isSetReplication() && parentTblCacheDirId != null) {
-          replication = HdfsCachingUtil.getCacheReplication(parentTblCacheDirId);
-        } else {
-          replication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
-        }
-      }
-      // If cache pool name is not null, it indicates this partition should be cached.
-      if (cachePoolName != null) {
-        long id = HdfsCachingUtil.submitCachePartitionDirective(partition,
-            cachePoolName, replication);
-        cacheIds = Lists.<Long>newArrayList(id);
-        // Update the partition metadata to include the cache directive id.
-        msClient.getHiveClient().alter_partition(partition.getDbName(),
-            partition.getTableName(), partition);
-      }
-      updateLastDdlTime(msTbl, msClient);
-    } catch (AlreadyExistsException e) {
-      if (!ifNotExists) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
-      }
-      LOG.debug(String.format("Ignoring '%s' when adding partition to %s because" +
-          " ifNotExists is true.", e, tableName));
-    } catch (TException e) {
-      throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
-    }
-    if (cacheIds != null) catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
-    // Return the table object with an updated catalog version after creating the
-    // partition.
-    result = addHdfsPartition(tbl, partition);
-    return result;
-  }
-
-  /**
-   * Drops an existing partition from the given table in Hive. If the partition is cached,
-   * the associated cache directive will also be removed.
-   * Also drops the partition from its Hdfs table.
-   * Returns the table object with an updated catalog version. If the partition does not
-   * exist and "IfExists" is true, null is returned. If purge is true, partition data is
-   * permanently deleted.
-   */
-  private Table alterTableDropPartition(Table tbl,
-      List<TPartitionKeyValue> partitionSpec, boolean ifExists, boolean purge)
-      throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    TableName tableName = tbl.getTableName();
-    if (ifExists && !catalog_.containsHdfsPartition(tableName.getDb(), tableName.getTbl(),
-        partitionSpec)) {
-      LOG.debug(String.format("Skipping partition drop because (%s) does not exist " +
-          "and ifExists is true.", Joiner.on(", ").join(partitionSpec)));
-      return null;
-    }
-
-    HdfsPartition part = catalog_.getHdfsPartition(tableName.getDb(),
-        tableName.getTbl(), partitionSpec);
-    org.apache.hadoop.hive.metastore.api.Table msTbl =
-        tbl.getMetaStoreTable().deepCopy();
-    List<String> values = Lists.newArrayList();
-    // Need to add in the values in the same order they are defined in the table.
-    for (FieldSchema fs: msTbl.getPartitionKeys()) {
-      for (TPartitionKeyValue kv: partitionSpec) {
-        if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
-          values.add(kv.getValue());
-        }
-      }
-    }
-    PartitionDropOptions dropOptions = PartitionDropOptions.instance();
-    dropOptions.purgeData(purge);
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      msClient.getHiveClient().dropPartition(tableName.getDb(),
-          tableName.getTbl(), values, dropOptions);
-      updateLastDdlTime(msTbl, msClient);
-      if (part.isMarkedCached()) {
-        HdfsCachingUtil.uncachePartition(part);
-      }
-    } catch (NoSuchObjectException e) {
-      if (!ifExists) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e);
-      }
-      LOG.debug(String.format("Ignoring '%s' when dropping partition from %s because" +
-          " ifExists is true.", e, tableName));
-    } catch (TException e) {
-      throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e);
-    }
-    return catalog_.dropPartition(tbl, partitionSpec);
-  }
-
-  /**
-   * Removes a column from the given table.
-   */
-  private void alterTableDropCol(Table tbl, String colName) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
-    // Find the matching column name and remove it.
-    Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
-    while (iterator.hasNext()) {
-      FieldSchema fs = iterator.next();
-      if (fs.getName().toLowerCase().equals(colName.toLowerCase())) {
-        iterator.remove();
-        break;
-      }
-      if (!iterator.hasNext()) {
-        throw new ColumnNotFoundException(String.format(
-            "Column name %s not found in table %s.", colName, tbl.getFullName()));
-      }
-    }
-    applyAlterTable(msTbl);
-  }
-
-  /**
-   * Renames an existing table or view. Saves, drops and restores the column stats for
-   * tables renamed across databases to work around HIVE-9720/IMPALA-1711.
-   * After renaming the table/view, its metadata is marked as invalid and will be
-   * reloaded on the next access.
-   */
-  private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
-      TDdlExecResponse response) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(oldTbl)
-        && catalog_.getLock().isWriteLockedByCurrentThread());
-    TableName tableName = oldTbl.getTableName();
-    org.apache.hadoop.hive.metastore.api.Table msTbl =
-        oldTbl.getMetaStoreTable().deepCopy();
-    msTbl.setDbName(newTableName.getDb());
-    msTbl.setTableName(newTableName.getTbl());
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Workaround for HIVE-9720/IMPALA-1711: When renaming a table with column
-      // stats across databases, we save, drop and restore the column stats because
-      // the HMS does not properly move them to the new table via alteration.
-      ColumnStatistics hmsColStats = null;
-      if (!msTbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.toString())
-          && !tableName.getDb().equalsIgnoreCase(newTableName.getDb())) {
-        Map<String, TColumnStats> colStats = Maps.newHashMap();
-        for (Column c: oldTbl.getColumns()) {
-          colStats.put(c.getName(), c.getStats().toThrift());
-        }
-        hmsColStats = createHiveColStats(colStats, oldTbl);
-        // Set the new db/table.
-        hmsColStats.setStatsDesc(new ColumnStatisticsDesc(true, newTableName.getDb(),
-            newTableName.getTbl()));
-
-        LOG.trace(String.format("Dropping column stats for table %s being " +
-            "renamed to %s to workaround HIVE-9720.",
-            tableName.toString(), newTableName.toString()));
-        // Delete all column stats of the original table from the HMS.
-        msClient.getHiveClient().deleteTableColumnStatistics(
-            tableName.getDb(), tableName.getTbl(), null);
-      }
-
-      // Perform the table rename in any case.
-      msClient.getHiveClient().alter_table(tableName.getDb(), tableName.getTbl(), msTbl);
-
-      if (hmsColStats != null) {
-        LOG.trace(String.format("Restoring column stats for table %s being " +
-            "renamed to %s to workaround HIVE-9720.",
-            tableName.toString(), newTableName.toString()));
-        msClient.getHiveClient().updateTableColumnStatistics(hmsColStats);
-      }
-    } catch (TException e) {
-      throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
-    }
-    // Rename the table in the Catalog and get the resulting catalog object.
-    // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
-    TCatalogObject newTable = TableToTCatalogObject(
-        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()));
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable());
-    removedObject.getTable().setTbl_name(tableName.getTbl());
-    removedObject.getTable().setDb_name(tableName.getDb());
-    removedObject.setCatalog_version(newTable.getCatalog_version());
-    response.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-    response.result.setUpdated_catalog_object_DEPRECATED(newTable);
-    response.result.setVersion(newTable.getCatalog_version());
-  }
-
-  /**
-   * Changes the file format for the given table or partition. This is a metadata only
-   * operation, existing table data will not be converted to the new format. After
-   * changing the file format the table metadata is marked as invalid and will be
-   * reloaded on the next access.
-   */
-  private boolean alterTableSetFileFormat(Table tbl,
-      List<TPartitionKeyValue> partitionSpec, THdfsFileFormat fileFormat)
-      throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    Preconditions.checkState(partitionSpec == null || !partitionSpec.isEmpty());
-    boolean reloadFileMetadata = false;
-    if (partitionSpec == null) {
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          tbl.getMetaStoreTable().deepCopy();
-      setStorageDescriptorFileFormat(msTbl.getSd(), fileFormat);
-      applyAlterTable(msTbl);
-      reloadFileMetadata = true;
-    } else {
-      TableName tableName = tbl.getTableName();
-      HdfsPartition partition = catalog_.getHdfsPartition(
-          tableName.getDb(), tableName.getTbl(), partitionSpec);
-      Preconditions.checkNotNull(partition);
-      partition.setFileFormat(HdfsFileFormat.fromThrift(fileFormat));
-      try {
-        applyAlterPartition(tbl, partition);
-      } finally {
-        partition.markDirty();
-      }
-    }
-    return reloadFileMetadata;
-  }
-
-  /**
-   * Helper method for setting the file format on a given storage descriptor.
-   */
-  private static void setStorageDescriptorFileFormat(StorageDescriptor sd,
-      THdfsFileFormat fileFormat) {
-    StorageDescriptor tempSd =
-        HiveStorageDescriptorFactory.createSd(fileFormat, RowFormat.DEFAULT_ROW_FORMAT);
-    sd.setInputFormat(tempSd.getInputFormat());
-    sd.setOutputFormat(tempSd.getOutputFormat());
-    sd.getSerdeInfo().setSerializationLib(tempSd.getSerdeInfo().getSerializationLib());
-  }
-
-  /**
-   * Changes the HDFS storage location for the given table. This is a metadata only
-   * operation, existing table data will not be as part of changing the location.
-   */
-  private boolean alterTableSetLocation(Table tbl,
-      List<TPartitionKeyValue> partitionSpec, String location) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    Preconditions.checkState(partitionSpec == null || !partitionSpec.isEmpty());
-    boolean reloadFileMetadata = false;
-    if (partitionSpec == null) {
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          tbl.getMetaStoreTable().deepCopy();
-      if (msTbl.getPartitionKeysSize() == 0) reloadFileMetadata = true;
-      msTbl.getSd().setLocation(location);
-      applyAlterTable(msTbl);
-    } else {
-      TableName tableName = tbl.getTableName();
-      HdfsPartition partition = catalog_.getHdfsPartition(
-          tableName.getDb(), tableName.getTbl(), partiti

<TRUNCATED>