You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:48 UTC

[31/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
deleted file mode 100644
index 27d25e1..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
+++ /dev/null
@@ -1,1268 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.UUID;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.TException;
-
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.authorization.SentryConfig;
-import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.common.JniUtil;
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.hive.executor.UdfExecutor;
-import com.cloudera.impala.thrift.TCatalog;
-import com.cloudera.impala.thrift.TCatalogObject;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TFunction;
-import com.cloudera.impala.thrift.TFunctionBinaryType;
-import com.cloudera.impala.thrift.TGetAllCatalogObjectsResponse;
-import com.cloudera.impala.thrift.TPartitionKeyValue;
-import com.cloudera.impala.thrift.TPrivilege;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.thrift.TTableName;
-import com.cloudera.impala.thrift.TUniqueId;
-import com.cloudera.impala.util.PatternMatcher;
-import com.cloudera.impala.util.SentryProxy;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-
-/**
- * Specialized Catalog that implements the CatalogService specific Catalog
- * APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
- * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog
- * will return the catalog version that the update will show up in. The client
- * can then wait until the statestore sends an update that contains that catalog
- * version.
- * The CatalogServiceCatalog also manages a global "catalog version". The version
- * is incremented and assigned to a CatalogObject whenever it is
- * added/modified/removed from the catalog. This means each CatalogObject will have a
- * unique version and assigned versions are strictly increasing.
- *
- * Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
- * background by the TableLoadingMgr; tables can be prioritized for loading by calling
- * prioritizeLoad(). Background loading can also be enabled for the catalog, in which
- * case missing tables (tables that are not yet loaded) are submitted to the
- * TableLoadingMgr any table metadata is invalidated and on startup. The metadata of
- * fully loaded tables (e.g. HdfsTable, HBaseTable, etc) are updated in-place and don't
- * trigger a background metadata load through the TableLoadingMgr. Accessing a table
- * that is not yet loaded (via getTable()), will load the table's metadata on-demand,
- * out-of-band of the table loading thread pool.
- *
- * See the class comments in CatalogOpExecutor for a description of the locking protocol
- * that should be employed if both the catalog lock and table locks need to be held at
- * the same time.
- *
- * TODO: Consider removing on-demand loading and have everything go through the table
- * loading thread pool.
- */
-public class CatalogServiceCatalog extends Catalog {
-  private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
-
-  private final TUniqueId catalogServiceId_;
-
-  // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
-  // protects catalogVersion_, it can be used to perform atomic bulk catalog operations
-  // since catalogVersion_ cannot change externally while the lock is being held.
-  // In addition to protecting catalogVersion_, it is currently used for the
-  // following bulk operations:
-  // * Building a delta update to send to the statestore in getCatalogObjects(),
-  //   so a snapshot of the catalog can be taken without any version changes.
-  // * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
-  //   from the metastore.
-  // * During renameTable(), because a table must be removed and added to the catalog
-  //   atomically (potentially in a different database).
-  private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
-
-  // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
-  // with each update to the Catalog. Continued across the lifetime of the object.
-  // Protected by catalogLock_.
-  // TODO: Handle overflow of catalogVersion_ and nextTableId_.
-  // TODO: The name of this variable is misleading and can be interpreted as a property
-  // of the catalog server. Rename into something that indicates its role as a global
-  // sequence number assigned to catalog objects.
-  private long catalogVersion_ = INITIAL_CATALOG_VERSION;
-
-  protected final AtomicInteger nextTableId_ = new AtomicInteger(0);
-
-  // Manages the scheduling of background table loading.
-  private final TableLoadingMgr tableLoadingMgr_;
-
-  private final boolean loadInBackground_;
-
-  // Periodically polls HDFS to get the latest set of known cache pools.
-  private final ScheduledExecutorService cachePoolReader_ =
-      Executors.newScheduledThreadPool(1);
-
-  // Proxy to access the Sentry Service and also periodically refreshes the
-  // policy metadata. Null if Sentry Service is not enabled.
-  private final SentryProxy sentryProxy_;
-
-  // Local temporary directory to copy UDF Jars.
-  private static final String LOCAL_LIBRARY_PATH = new String("file://" +
-      System.getProperty("java.io.tmpdir"));
-
-  /**
-   * Initialize the CatalogServiceCatalog. If loadInBackground is true, table metadata
-   * will be loaded in the background
-   */
-  public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal) {
-    super(true);
-    catalogServiceId_ = catalogServiceId;
-    tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
-    loadInBackground_ = loadInBackground;
-    try {
-      // We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
-      // local, etc.)
-      if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
-        cachePoolReader_.scheduleAtFixedRate(
-            new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
-      }
-    } catch (IOException e) {
-      LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
-    }
-    if (sentryConfig != null) {
-      sentryProxy_ = new SentryProxy(sentryConfig, this, kerberosPrincipal);
-    } else {
-      sentryProxy_ = null;
-    }
-  }
-
-  /**
-   * Reads the current set of cache pools from HDFS and updates the catalog.
-   * Called periodically by the cachePoolReader_.
-   */
-  protected class CachePoolReader implements Runnable {
-
-    /**
-     * This constructor is needed to create a non-threaded execution of the class.
-     */
-    public CachePoolReader() {
-      super();
-    }
-
-    public void run() {
-      LOG.trace("Reloading cache pool names from HDFS");
-      // Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations
-      // to be performed on the keys.
-      Map<String, CachePoolInfo> currentCachePools = Maps.newHashMap();
-      try {
-        DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
-        RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
-        while (itr.hasNext()) {
-          CachePoolInfo cachePoolInfo = itr.next().getInfo();
-          currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
-        }
-      } catch (Exception e) {
-        LOG.error("Error loading cache pools: ", e);
-        return;
-      }
-
-      catalogLock_.writeLock().lock();
-      try {
-        // Determine what has changed relative to what we have cached.
-        Set<String> droppedCachePoolNames = Sets.difference(
-            hdfsCachePools_.keySet(), currentCachePools.keySet());
-        Set<String> createdCachePoolNames = Sets.difference(
-            currentCachePools.keySet(), hdfsCachePools_.keySet());
-        // Add all new cache pools.
-        for (String createdCachePool: createdCachePoolNames) {
-          HdfsCachePool cachePool = new HdfsCachePool(
-              currentCachePools.get(createdCachePool));
-          cachePool.setCatalogVersion(
-              CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
-          hdfsCachePools_.add(cachePool);
-        }
-        // Remove dropped cache pools.
-        for (String cachePoolName: droppedCachePoolNames) {
-          hdfsCachePools_.remove(cachePoolName);
-          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
-        }
-      } finally {
-        catalogLock_.writeLock().unlock();
-      }
-    }
-  }
-
-  /**
-   * Adds a list of cache directive IDs for the given table name. Asynchronously
-   * refreshes the table metadata once all cache directives complete.
-   */
-  public void watchCacheDirs(List<Long> dirIds, TTableName tblName) {
-    tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
-  }
-
-  /**
-   * Prioritizes the loading of the given list TCatalogObjects. Currently only support
-   * loading Table/View metadata since Db and Function metadata is not loaded lazily.
-   */
-  public void prioritizeLoad(List<TCatalogObject> objectDescs) {
-    for (TCatalogObject catalogObject: objectDescs) {
-      Preconditions.checkState(catalogObject.isSetTable());
-      TTable table = catalogObject.getTable();
-      tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(),
-          table.getTbl_name().toLowerCase()));
-    }
-  }
-
-  /**
-   * Returns all known objects in the Catalog (Tables, Views, Databases, and
-   * Functions). Some metadata may be skipped for objects that have a catalog
-   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
-   * update contains a consistent snapshot of all items in the catalog. While holding the
-   * catalog lock, it locks each accessed table to protect against concurrent
-   * modifications.
-   */
-  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
-    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
-    resp.setObjects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
-    catalogLock_.readLock().lock();
-    try {
-      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-        TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
-            db.getCatalogVersion());
-        catalogDb.setDb(db.toThrift());
-        resp.addToObjects(catalogDb);
-
-        for (String tblName: db.getAllTableNames()) {
-          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-              Catalog.INITIAL_CATALOG_VERSION);
-
-          Table tbl = db.getTable(tblName);
-          if (tbl == null) {
-            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
-                "cache. Skipping table for this update.");
-            continue;
-          }
-
-          // Protect the table from concurrent modifications.
-          synchronized(tbl) {
-            // Only add the extended metadata if this table's version is >=
-            // the fromVersion.
-            if (tbl.getCatalogVersion() >= fromVersion) {
-              try {
-                catalogTbl.setTable(tbl.toThrift());
-              } catch (Exception e) {
-                LOG.debug(String.format("Error calling toThrift() on table %s.%s: %s",
-                    db.getName(), tblName, e.getMessage()), e);
-                continue;
-              }
-              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            } else {
-              catalogTbl.setTable(new TTable(db.getName(), tblName));
-            }
-          }
-          resp.addToObjects(catalogTbl);
-        }
-
-        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-              fn.getCatalogVersion());
-          function.setFn(fn.toThrift());
-          resp.addToObjects(function);
-        }
-      }
-
-      for (DataSource dataSource: getDataSources()) {
-        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-            dataSource.getCatalogVersion());
-        catalogObj.setData_source(dataSource.toThrift());
-        resp.addToObjects(catalogObj);
-      }
-      for (HdfsCachePool cachePool: hdfsCachePools_) {
-        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-            cachePool.getCatalogVersion());
-        pool.setCache_pool(cachePool.toThrift());
-        resp.addToObjects(pool);
-      }
-
-      // Get all roles
-      for (Role role: authPolicy_.getAllRoles()) {
-        TCatalogObject thriftRole = new TCatalogObject();
-        thriftRole.setRole(role.toThrift());
-        thriftRole.setCatalog_version(role.getCatalogVersion());
-        thriftRole.setType(role.getCatalogObjectType());
-        resp.addToObjects(thriftRole);
-
-        for (RolePrivilege p: role.getPrivileges()) {
-          TCatalogObject privilege = new TCatalogObject();
-          privilege.setPrivilege(p.toThrift());
-          privilege.setCatalog_version(p.getCatalogVersion());
-          privilege.setType(p.getCatalogObjectType());
-          resp.addToObjects(privilege);
-        }
-      }
-
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      catalog.setCatalog_version(getCatalogVersion());
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToObjects(catalog);
-
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(getCatalogVersion());
-      return resp;
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-  }
-
-  /**
-   * Returns all user defined functions (aggregate and scalar) in the specified database.
-   * Functions are not returned in a defined order.
-   */
-  public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
-    Db db = getDb(dbName);
-    if (db == null) {
-      throw new DatabaseNotFoundException("Database does not exist: " + dbName);
-    }
-
-    // Contains map of overloaded function names to all functions matching that name.
-    HashMap<String, List<Function>> dbFns = db.getAllFunctions();
-    List<Function> fns = new ArrayList<Function>(dbFns.size());
-    for (List<Function> fnOverloads: dbFns.values()) {
-      for (Function fn: fnOverloads) {
-        fns.add(fn);
-      }
-    }
-    return fns;
-  }
-
-  /**
-   * Checks if the Hive function 'fn' is Impala compatible. A function is Impala
-   * compatible iff
-   *
-   * 1. The function is JAVA based,
-   * 2. Has exactly one binary resource associated (We don't support loading
-   *    dependencies yet) and
-   * 3. The binary is of type JAR.
-   *
-   * Returns true if compatible and false otherwise. In case of incompatible
-   * functions 'incompatMsg' has the reason for the incompatibility.
-   * */
-   public static boolean isFunctionCompatible(
-       org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
-    boolean isCompatible = true;
-    if (fn.getFunctionType() != FunctionType.JAVA) {
-      isCompatible = false;
-      incompatMsg.append("Function type: " + fn.getFunctionType().name()
-          + " is not supported. Only " + FunctionType.JAVA.name() + " functions "
-          + "are supported.");
-    } else if (fn.getResourceUrisSize() == 0) {
-      isCompatible = false;
-      incompatMsg.append("No executable binary resource (like a JAR file) is " +
-          "associated with this function. To fix this, recreate the function by " +
-          "specifying a 'location' in the function create statement.");
-    } else if (fn.getResourceUrisSize() != 1) {
-      isCompatible = false;
-      List<String> resourceUris = Lists.newArrayList();
-      for (ResourceUri resource: fn.getResourceUris()) {
-        resourceUris.add(resource.getUri());
-      }
-      incompatMsg.append("Impala does not support multiple Jars for dependencies."
-          + "(" + Joiner.on(",").join(resourceUris) + ") ");
-    } else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
-      isCompatible = false;
-      incompatMsg.append("Function binary type: " +
-        fn.getResourceUris().get(0).getResourceType().name()
-        + " is not supported. Only " + ResourceType.JAR.name()
-        + " type is supported.");
-    }
-    return isCompatible;
-  }
-
-  /**
-   * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
-   * class referred to by the given Java function. This method copies the UDF Jar
-   * referenced by "function" to a temporary file in "LOCAL_LIBRARY_PATH" and loads it
-   * into the jvm. Then we scan all the methods in the class using reflection and extract
-   * those methods and create corresponding Impala functions. Currently Impala supports
-   * only "JAR" files for symbols and also a single Jar containing all the dependent
-   * classes rather than a set of Jar files.
-   */
-  public static List<Function> extractFunctions(String db,
-      org.apache.hadoop.hive.metastore.api.Function function)
-      throws ImpalaRuntimeException{
-    List<Function> result = Lists.newArrayList();
-    List<String> addedSignatures = Lists.newArrayList();
-    StringBuilder warnMessage = new StringBuilder();
-    if (!isFunctionCompatible(function, warnMessage)) {
-      LOG.warn("Skipping load of incompatible function: " +
-          function.getFunctionName() + ". " + warnMessage.toString());
-      return result;
-    }
-    String jarUri = function.getResourceUris().get(0).getUri();
-    Class<?> udfClass = null;
-    try {
-      Path localJarPath = new Path(LOCAL_LIBRARY_PATH,
-          UUID.randomUUID().toString() + ".jar");
-      try {
-        FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
-      } catch (IOException e) {
-        String errorMsg = "Error loading Java function: " + db + "." +
-            function.getFunctionName() + ". Couldn't copy " + jarUri +
-            " to local path: " + localJarPath.toString();
-        LOG.error(errorMsg, e);
-        throw new ImpalaRuntimeException(errorMsg);
-      }
-      URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
-      URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls);
-      udfClass = urlClassLoader.loadClass(function.getClassName());
-      // Check if the class is of UDF type. Currently we don't support other functions
-      // TODO: Remove this once we support Java UDAF/UDTF
-      if (FunctionUtils.getUDFClassType(udfClass) != FunctionUtils.UDFClassType.UDF) {
-        LOG.warn("Ignoring load of incompatible Java function: " +
-            function.getFunctionName() + " as " + FunctionUtils.getUDFClassType(udfClass)
-            + " is not a supported type. Only UDFs are supported");
-        return result;
-      }
-      // Load each method in the UDF class and create the corresponding Impala Function
-      // object.
-      for (Method m: udfClass.getMethods()) {
-        if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
-        Function fn = ScalarFunction.fromHiveFunction(db,
-            function.getFunctionName(), function.getClassName(),
-            m.getParameterTypes(), m.getReturnType(), jarUri);
-        if (fn == null) {
-          LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of " +
-             "Hive UDF:" + function.getFunctionName() + " from " + udfClass);
-          continue;
-        }
-        if (!addedSignatures.contains(fn.signatureString())) {
-          result.add(fn);
-          addedSignatures.add(fn.signatureString());
-        }
-      }
-    } catch (ClassNotFoundException c) {
-      String errorMsg = "Error loading Java function: " + db + "." +
-          function.getFunctionName() + ". Symbol class " + udfClass +
-          "not found in Jar: " + jarUri;
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, c);
-    } catch (Exception e) {
-      LOG.error("Skipping function load: " + function.getFunctionName(), e);
-      throw new ImpalaRuntimeException("Error extracting functions", e);
-    } catch (LinkageError e) {
-      String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
-          function.getFunctionName();
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, e);
-    }
-    return result;
-  }
-
- /**
-   * Extracts Impala functions stored in metastore db parameters and adds them to
-   * the catalog cache.
-   */
-  private void loadFunctionsFromDbParams(Db db,
-      org.apache.hadoop.hive.metastore.api.Database msDb) {
-    if (msDb == null || msDb.getParameters() == null) return;
-    LOG.info("Loading native functions for database: " + db.getName());
-    TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
-    for (String key: msDb.getParameters().keySet()) {
-      if (!key.startsWith(Db.FUNCTION_INDEX_PREFIX)) continue;
-      try {
-        TFunction fn = new TFunction();
-        JniUtil.deserializeThrift(protocolFactory, fn,
-            Base64.decodeBase64(msDb.getParameters().get(key)));
-        Function addFn = Function.fromThrift(fn);
-        db.addFunction(addFn, false);
-        addFn.setCatalogVersion(incrementAndGetCatalogVersion());
-      } catch (ImpalaException e) {
-        LOG.error("Encountered an error during function load: key=" + key
-            + ",continuing", e);
-      }
-    }
-  }
-
-  /**
-   * Loads Java functions into the catalog. For each function in "functions",
-   * we extract all Impala compatible evaluate() signatures and load them
-   * as separate functions in the catalog.
-   */
-  private void loadJavaFunctions(Db db,
-      List<org.apache.hadoop.hive.metastore.api.Function> functions) {
-    Preconditions.checkNotNull(functions);
-    LOG.info("Loading Java functions for database: " + db.getName());
-    for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
-      try {
-        for (Function fn: extractFunctions(db.getName(), function)) {
-          db.addFunction(fn);
-          fn.setCatalogVersion(incrementAndGetCatalogVersion());
-        }
-      } catch (Exception e) {
-        LOG.error("Skipping function load: " + function.getFunctionName(), e);
-      }
-    }
-  }
-
-  /**
-   * Invalidates the database 'db'. This method can have potential race
-   * conditions with external changes to the Hive metastore and hence any
-   * conflicting changes to the objects can manifest in the form of exceptions
-   * from the HMS calls which are appropriately handled. Returns the invalidated
-   * 'Db' object along with list of tables to be loaded by the TableLoadingMgr.
-   * Returns null if the method encounters an exception during invalidation.
-   */
-  private Pair<Db, List<TTableName>> invalidateDb(
-      MetaStoreClient msClient, String dbName, Db existingDb) {
-    try {
-      List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
-          Lists.newArrayList();
-      for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) {
-        javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
-      }
-      org.apache.hadoop.hive.metastore.api.Database msDb =
-          msClient.getHiveClient().getDatabase(dbName);
-      Db newDb = new Db(dbName, this, msDb);
-      // existingDb is usually null when the Catalog loads for the first time.
-      // In that case we needn't restore any transient functions.
-      if (existingDb != null) {
-        // Restore UDFs that aren't persisted. They are only cleaned up on
-        // Catalog restart.
-        for (Function fn: existingDb.getTransientFunctions()) {
-          newDb.addFunction(fn);
-          fn.setCatalogVersion(incrementAndGetCatalogVersion());
-        }
-      }
-      // Reload native UDFs.
-      loadFunctionsFromDbParams(newDb, msDb);
-      // Reload Java UDFs from HMS.
-      loadJavaFunctions(newDb, javaFns);
-      newDb.setCatalogVersion(incrementAndGetCatalogVersion());
-
-      List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
-      for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
-        Table incompleteTbl = IncompleteTable.createUninitializedTable(
-            getNextTableId(), newDb, tableName);
-        incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
-        newDb.addTable(incompleteTbl);
-        if (loadInBackground_) {
-          tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
-        }
-      }
-      return Pair.create(newDb, tblsToBackgroundLoad);
-    } catch (Exception e) {
-      LOG.warn("Encountered an exception while invalidating database: " + dbName +
-          ". Ignoring further load of this db.", e);
-    }
-    return null;
-  }
-
-  /**
-   * Resets this catalog instance by clearing all cached table and database metadata.
-   */
-  public void reset() throws CatalogException {
-    // First update the policy metadata.
-    if (sentryProxy_ != null) {
-      // Sentry Service is enabled.
-      try {
-        // Update the authorization policy, waiting for the result to complete.
-        sentryProxy_.refresh();
-      } catch (Exception e) {
-        throw new CatalogException("Error updating authorization policy: ", e);
-      }
-    }
-
-    catalogLock_.writeLock().lock();
-    try {
-      nextTableId_.set(0);
-
-      // Not all Java UDFs are persisted to the metastore. The ones which aren't
-      // should be restored once the catalog has been invalidated.
-      Map<String, Db> oldDbCache = dbCache_.get();
-
-      // Build a new DB cache, populate it, and replace the existing cache in one
-      // step.
-      ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
-      List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
-      try (MetaStoreClient msClient = getMetaStoreClient()) {
-        for (String dbName: msClient.getHiveClient().getAllDatabases()) {
-          dbName = dbName.toLowerCase();
-          Db oldDb = oldDbCache.get(dbName);
-          Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
-              dbName, oldDb);
-          if (invalidatedDb == null) continue;
-          newDbCache.put(dbName, invalidatedDb.first);
-          tblsToBackgroundLoad.addAll(invalidatedDb.second);
-        }
-      }
-      dbCache_.set(newDbCache);
-      // Submit tables for background loading.
-      for (TTableName tblName: tblsToBackgroundLoad) {
-        tableLoadingMgr_.backgroundLoad(tblName);
-      }
-    } catch (Exception e) {
-      LOG.error(e);
-      throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Adds a database name to the metadata cache and returns the database's
-   * new Db object. Used by CREATE DATABASE statements.
-   */
-  public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb)
-      throws ImpalaException {
-    Db newDb = new Db(dbName, this, msDb);
-    newDb.setCatalogVersion(incrementAndGetCatalogVersion());
-    addDb(newDb);
-    return newDb;
-  }
-
-  /**
-   * Removes a database from the metadata cache and returns the removed database,
-   * or null if the database did not exist in the cache.
-   * Used by DROP DATABASE statements.
-   */
-  @Override
-  public Db removeDb(String dbName) {
-    Db removedDb = super.removeDb(dbName);
-    if (removedDb != null) {
-      removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
-    return removedDb;
-  }
-
-  /**
-   * Adds a table with the given name to the catalog and returns the new table,
-   * loading the metadata if needed.
-   */
-  public Table addTable(String dbName, String tblName) throws TableNotFoundException {
-    Db db = getDb(dbName);
-    if (db == null) return null;
-    Table incompleteTable =
-        IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName);
-    incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(incompleteTable);
-    return db.getTable(tblName);
-  }
-
-  /**
-   * Gets the table with the given name, loading it if needed (if the existing catalog
-   * object is not yet loaded). Returns the matching Table or null if no table with this
-   * name exists in the catalog.
-   * If the existing table is dropped or modified (indicated by the catalog version
-   * changing) while the load is in progress, the loaded value will be discarded
-   * and the current cached value will be returned. This may mean that a missing table
-   * (not yet loaded table) will be returned.
-   */
-  public Table getOrLoadTable(String dbName, String tblName)
-      throws CatalogException {
-    TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
-    TableLoadingMgr.LoadRequest loadReq;
-
-    long previousCatalogVersion;
-    // Return the table if it is already loaded or submit a new load request.
-    catalogLock_.readLock().lock();
-    try {
-      Table tbl = getTable(dbName, tblName);
-      if (tbl == null || tbl.isLoaded()) return tbl;
-      previousCatalogVersion = tbl.getCatalogVersion();
-      loadReq = tableLoadingMgr_.loadAsync(tableName);
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-    Preconditions.checkNotNull(loadReq);
-    try {
-      // The table may have been dropped/modified while the load was in progress, so only
-      // apply the update if the existing table hasn't changed.
-      return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
-    } finally {
-      loadReq.close();
-    }
-  }
-
-  /**
-   * Replaces an existing Table with a new value if it exists and has not changed
-   * (has the same catalog version as 'expectedCatalogVersion').
-   */
-  private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
-      throws DatabaseNotFoundException {
-    catalogLock_.writeLock().lock();
-    try {
-      Db db = getDb(updatedTbl.getDb().getName());
-      if (db == null) {
-        throw new DatabaseNotFoundException(
-            "Database does not exist: " + updatedTbl.getDb().getName());
-      }
-
-      Table existingTbl = db.getTable(updatedTbl.getName());
-      // The existing table does not exist or has been modified. Instead of
-      // adding the loaded value, return the existing table.
-      if (existingTbl == null ||
-          existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl;
-
-      updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
-      db.addTable(updatedTbl);
-      return updatedTbl;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Removes a table from the catalog and increments the catalog version.
-   * Returns the removed Table, or null if the table or db does not exist.
-   */
-  public Table removeTable(String dbName, String tblName) {
-    Db parentDb = getDb(dbName);
-    if (parentDb == null) return null;
-
-    Table removedTable = parentDb.removeTable(tblName);
-    if (removedTable != null) {
-      removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
-    return removedTable;
-  }
-
-  /**
-   * Removes a function from the catalog. Increments the catalog version and returns
-   * the Function object that was removed. If the function did not exist, null will
-   * be returned.
-   */
-  @Override
-  public Function removeFunction(Function desc) {
-    Function removedFn = super.removeFunction(desc);
-    if (removedFn != null) {
-      removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
-    return removedFn;
-  }
-
-  /**
-   * Adds a function from the catalog, incrementing the catalog version. Returns true if
-   * the add was successful, false otherwise.
-   */
-  @Override
-  public boolean addFunction(Function fn) {
-    Db db = getDb(fn.getFunctionName().getDb());
-    if (db == null) return false;
-    if (db.addFunction(fn)) {
-      fn.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Adds a data source to the catalog, incrementing the catalog version. Returns true
-   * if the add was successful, false otherwise.
-   */
-  @Override
-  public boolean addDataSource(DataSource dataSource) {
-    if (dataSources_.add(dataSource)) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public DataSource removeDataSource(String dataSourceName) {
-    DataSource dataSource = dataSources_.remove(dataSourceName);
-    if (dataSource != null) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
-    return dataSource;
-  }
-
-  /**
-   * Returns the table parameter 'transient_lastDdlTime', or -1 if it's not set.
-   * TODO: move this to a metastore helper class.
-   */
-  public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    Preconditions.checkNotNull(msTbl);
-    Map<String, String> params = msTbl.getParameters();
-    String lastDdlTimeStr = params.get("transient_lastDdlTime");
-    if (lastDdlTimeStr != null) {
-      try {
-        return Long.parseLong(lastDdlTimeStr);
-      } catch (NumberFormatException e) {}
-    }
-    return -1;
-  }
-
-  /**
-   * Updates the cached lastDdlTime for the given table. The lastDdlTime is used during
-   * the metadata refresh() operations to determine if there have been any external
-   * (outside of Impala) modifications to the table.
-   */
-  public void updateLastDdlTime(TTableName tblName, long ddlTime) {
-    Db db = getDb(tblName.getDb_name());
-    if (db == null) return;
-    Table tbl = db.getTable(tblName.getTable_name());
-    if (tbl == null) return;
-    tbl.updateLastDdlTime(ddlTime);
-  }
-
-  /**
-   * Renames a table. Equivalent to an atomic drop + add of the table. Returns
-   * the new Table object with an incremented catalog version or null if operation
-   * was not successful.
-   */
-  public Table renameTable(TTableName oldTableName, TTableName newTableName)
-      throws CatalogException {
-    // Remove the old table name from the cache and add the new table.
-    Db db = getDb(oldTableName.getDb_name());
-    if (db != null) db.removeTable(oldTableName.getTable_name());
-    return addTable(newTableName.getDb_name(), newTableName.getTable_name());
-  }
-
-  /**
-   * Reloads metadata for table 'tbl'. If 'tbl' is an IncompleteTable, it makes an
-   * asynchronous request to the table loading manager to create a proper table instance
-   * and load the metadata from Hive Metastore. Otherwise, it updates table metadata
-   * in-place by calling the load() function on the specified table. Returns 'tbl', if it
-   * is a fully loaded table (e.g. HdfsTable, HBaseTable, etc). Otherwise, returns a
-   * newly constructed fully loaded table. Applies proper synchronization to protect the
-   * metadata load from concurrent table modifications and assigns a new catalog version.
-   * Throws a CatalogException if there is an error loading table metadata.
-   */
-  public Table reloadTable(Table tbl) throws CatalogException {
-    LOG.debug(String.format("Refreshing table metadata: %s", tbl.getFullName()));
-    TTableName tblName = new TTableName(tbl.getDb().getName().toLowerCase(),
-        tbl.getName().toLowerCase());
-    Db db = tbl.getDb();
-    if (tbl instanceof IncompleteTable) {
-      TableLoadingMgr.LoadRequest loadReq;
-      long previousCatalogVersion;
-      // Return the table if it is already loaded or submit a new load request.
-      catalogLock_.readLock().lock();
-      try {
-        previousCatalogVersion = tbl.getCatalogVersion();
-        loadReq = tableLoadingMgr_.loadAsync(tblName);
-      } finally {
-        catalogLock_.readLock().unlock();
-      }
-      Preconditions.checkNotNull(loadReq);
-      try {
-        // The table may have been dropped/modified while the load was in progress, so
-        // only apply the update if the existing table hasn't changed.
-        return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
-      } finally {
-        loadReq.close();
-      }
-    }
-
-    catalogLock_.writeLock().lock();
-    synchronized(tbl) {
-      long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
-      try (MetaStoreClient msClient = getMetaStoreClient()) {
-        org.apache.hadoop.hive.metastore.api.Table msTbl = null;
-        try {
-          msTbl = msClient.getHiveClient().getTable(db.getName(),
-              tblName.getTable_name());
-        } catch (Exception e) {
-          throw new TableLoadingException("Error loading metadata for table: " +
-              db.getName() + "." + tblName.getTable_name(), e);
-        }
-        tbl.load(true, msClient.getHiveClient(), msTbl);
-      }
-      tbl.setCatalogVersion(newCatalogVersion);
-      return tbl;
-    }
-  }
-
-  /**
-   * Reloads the metadata of a table with name 'tableName'. Returns the table or null if
-   * the table does not exist.
-   */
-  public Table reloadTable(TTableName tableName) throws CatalogException {
-    Table table = getTable(tableName.getDb_name(), tableName.getTable_name());
-    if (table == null) return null;
-    return reloadTable(table);
-  }
-
-  /**
-   * Drops the partition specified in 'partitionSpec' from 'tbl'. Throws a
-   * CatalogException if 'tbl' is not an HdfsTable. If the partition having the given
-   * partition spec does not exist, null is returned. Otherwise, the modified table is
-   * returned.
-   */
-  public Table dropPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
-      throws CatalogException {
-    Preconditions.checkNotNull(tbl);
-    Preconditions.checkNotNull(partitionSpec);
-    Preconditions.checkState(Thread.holdsLock(tbl));
-    if (!(tbl instanceof HdfsTable)) {
-      throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table");
-    }
-    HdfsTable hdfsTable = (HdfsTable) tbl;
-    if (hdfsTable.dropPartition(partitionSpec) == null) return null;
-    return hdfsTable;
-  }
-
-  /**
-   * Adds a partition to its HdfsTable and returns the modified table.
-   */
-  public Table addPartition(HdfsPartition partition) throws CatalogException {
-    Preconditions.checkNotNull(partition);
-    HdfsTable hdfsTable = partition.getTable();
-    Db db = getDb(hdfsTable.getDb().getName());
-    hdfsTable.addPartition(partition);
-    return hdfsTable;
-  }
-
-  /**
-   * Invalidates the table in the catalog cache, potentially adding/removing the table
-   * from the cache based on whether it exists in the Hive Metastore.
-   * The invalidation logic is:
-   * - If the table exists in the metastore, add it to the catalog as an uninitialized
-   *   IncompleteTable (replacing any existing entry). The table metadata will be
-   *   loaded lazily, on the next access. If the parent database for this table does not
-   *   yet exist in Impala's cache it will also be added.
-   * - If the table does not exist in the metastore, remove it from the catalog cache.
-   * - If we are unable to determine whether the table exists in the metastore (there was
-   *   an exception thrown making the RPC), invalidate any existing Table by replacing
-   *   it with an uninitialized IncompleteTable.
-   *
-   * The parameter updatedObjects is a Pair that contains details on what catalog objects
-   * were modified as a result of the invalidateTable() call. The first item in the Pair
-   * is a Db which will only be set if a new database was added as a result of this call,
-   * otherwise it will be null. The second item in the Pair is the Table that was
-   * modified/added/removed.
-   * Returns a flag that indicates whether the items in updatedObjects were removed
-   * (returns true) or added/modified (return false). Only Tables should ever be removed.
-   */
-  public boolean invalidateTable(TTableName tableName, Pair<Db, Table> updatedObjects) {
-    Preconditions.checkNotNull(updatedObjects);
-    updatedObjects.first = null;
-    updatedObjects.second = null;
-    LOG.debug(String.format("Invalidating table metadata: %s.%s",
-        tableName.getDb_name(), tableName.getTable_name()));
-    String dbName = tableName.getDb_name();
-    String tblName = tableName.getTable_name();
-
-    // Stores whether the table exists in the metastore. Can have three states:
-    // 1) true - Table exists in metastore.
-    // 2) false - Table does not exist in metastore.
-    // 3) unknown (null) - There was exception thrown by the metastore client.
-    Boolean tableExistsInMetaStore;
-    Db db = null;
-    try (MetaStoreClient msClient = getMetaStoreClient()) {
-      org.apache.hadoop.hive.metastore.api.Database msDb = null;
-      try {
-        tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
-      } catch (UnknownDBException e) {
-        // The parent database does not exist in the metastore. Treat this the same
-        // as if the table does not exist.
-        tableExistsInMetaStore = false;
-      } catch (TException e) {
-        LOG.error("Error executing tableExists() metastore call: " + tblName, e);
-        tableExistsInMetaStore = null;
-      }
-
-      if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
-        updatedObjects.second = removeTable(dbName, tblName);
-        return true;
-      }
-
-      db = getDb(dbName);
-      if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
-        // The table does not exist in our cache AND it is unknown whether the
-        // table exists in the metastore. Do nothing.
-        return false;
-      } else if (db == null && tableExistsInMetaStore) {
-        // The table exists in the metastore, but our cache does not contain the parent
-        // database. A new db will be added to the cache along with the new table. msDb
-        // must be valid since tableExistsInMetaStore is true.
-        try {
-          msDb = msClient.getHiveClient().getDatabase(dbName);
-          Preconditions.checkNotNull(msDb);
-          db = new Db(dbName, this, msDb);
-          db.setCatalogVersion(incrementAndGetCatalogVersion());
-          addDb(db);
-          updatedObjects.first = db;
-        } catch (TException e) {
-          // The metastore database cannot be get. Log the error and return.
-          LOG.error("Error executing getDatabase() metastore call: " + dbName, e);
-          return false;
-        }
-      }
-    }
-
-    // Add a new uninitialized table to the table cache, effectively invalidating
-    // any existing entry. The metadata for the table will be loaded lazily, on the
-    // on the next access to the table.
-    Table newTable = IncompleteTable.createUninitializedTable(
-        getNextTableId(), db, tblName);
-    newTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(newTable);
-    if (loadInBackground_) {
-      tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
-          tblName.toLowerCase()));
-    }
-    updatedObjects.second = newTable;
-    return false;
-  }
-
-  /**
-   * Adds a new role with the given name and grant groups to the AuthorizationPolicy.
-   * If a role with the same name already exists it will be overwritten.
-   */
-  public Role addRole(String roleName, Set<String> grantGroups) {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = new Role(roleName, grantGroups);
-      role.setCatalogVersion(incrementAndGetCatalogVersion());
-      authPolicy_.addRole(role);
-      return role;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Removes the role with the given name from the AuthorizationPolicy. Returns the
-   * removed role with an incremented catalog version, or null if no role with this name
-   * exists.
-   */
-  public Role removeRole(String roleName) {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = authPolicy_.removeRole(roleName);
-      if (role == null) return null;
-      role.setCatalogVersion(incrementAndGetCatalogVersion());
-      return role;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Adds a grant group to the given role name and returns the modified Role with
-   * an updated catalog version. If the role does not exist a CatalogException is thrown.
-   */
-  public Role addRoleGrantGroup(String roleName, String groupName)
-      throws CatalogException {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = authPolicy_.addGrantGroup(roleName, groupName);
-      Preconditions.checkNotNull(role);
-      role.setCatalogVersion(incrementAndGetCatalogVersion());
-      return role;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Removes a grant group from the given role name and returns the modified Role with
-   * an updated catalog version. If the role does not exist a CatalogException is thrown.
-   */
-  public Role removeRoleGrantGroup(String roleName, String groupName)
-      throws CatalogException {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = authPolicy_.removeGrantGroup(roleName, groupName);
-      Preconditions.checkNotNull(role);
-      role.setCatalogVersion(incrementAndGetCatalogVersion());
-      return role;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Adds a privilege to the given role name. Returns the new RolePrivilege and
-   * increments the catalog version. If the parent role does not exist a CatalogException
-   * is thrown.
-   */
-  public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
-      throws CatalogException {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = authPolicy_.getRole(roleName);
-      if (role == null) throw new CatalogException("Role does not exist: " + roleName);
-      RolePrivilege priv = RolePrivilege.fromThrift(thriftPriv);
-      priv.setCatalogVersion(incrementAndGetCatalogVersion());
-      authPolicy_.addPrivilege(priv);
-      return priv;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Removes a RolePrivilege from the given role name. Returns the removed
-   * RolePrivilege with an incremented catalog version or null if no matching privilege
-   * was found. Throws a CatalogException if no role exists with this name.
-   */
-  public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
-      throws CatalogException {
-    catalogLock_.writeLock().lock();
-    try {
-      Role role = authPolicy_.getRole(roleName);
-      if (role == null) throw new CatalogException("Role does not exist: " + roleName);
-      RolePrivilege rolePrivilege =
-          role.removePrivilege(thriftPriv.getPrivilege_name());
-      if (rolePrivilege == null) return null;
-      rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
-      return rolePrivilege;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Gets a RolePrivilege from the given role name. Returns the privilege if it exists,
-   * or null if no privilege matching the privilege spec exist.
-   * Throws a CatalogException if the role does not exist.
-   */
-  public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
-      throws CatalogException {
-    catalogLock_.readLock().lock();
-    try {
-      Role role = authPolicy_.getRole(roleName);
-      if (role == null) throw new CatalogException("Role does not exist: " + roleName);
-      return role.getPrivilege(privSpec.getPrivilege_name());
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-  }
-
-  /**
-   * Increments the current Catalog version and returns the new value.
-   */
-  public long incrementAndGetCatalogVersion() {
-    catalogLock_.writeLock().lock();
-    try {
-      return ++catalogVersion_;
-    } finally {
-      catalogLock_.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the current Catalog version.
-   */
-  public long getCatalogVersion() {
-    catalogLock_.readLock().lock();
-    try {
-      return catalogVersion_;
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-  }
-
-  public ReentrantReadWriteLock getLock() { return catalogLock_; }
-
-  /**
-   * Gets the next table ID and increments the table ID counter.
-   */
-  public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); }
-  public SentryProxy getSentryProxy() { return sentryProxy_; }
-  public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
-
-  /**
-   * Reloads metadata for the partition defined by the partition spec
-   * 'partitionSpec' in table 'tbl'. Returns the table object with partition
-   * metadata reloaded
-   */
-  public Table reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
-      throws CatalogException {
-    catalogLock_.writeLock().lock();
-    synchronized (tbl) {
-      long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
-      HdfsTable hdfsTable = (HdfsTable) tbl;
-      HdfsPartition hdfsPartition = hdfsTable
-          .getPartitionFromThriftPartitionSpec(partitionSpec);
-      // Retrieve partition name from existing partition or construct it from
-      // the partition spec
-      String partitionName = hdfsPartition == null
-          ? HdfsTable.constructPartitionName(partitionSpec)
-          : hdfsPartition.getPartitionName();
-      LOG.debug(String.format("Refreshing Partition metadata: %s %s",
-          hdfsTable.getFullName(), partitionName));
-      try (MetaStoreClient msClient = getMetaStoreClient()) {
-        org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
-        try {
-          hmsPartition = msClient.getHiveClient().getPartition(
-              hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName);
-        } catch (NoSuchObjectException e) {
-          // If partition does not exist in Hive Metastore, remove it from the
-          // catalog
-          if (hdfsPartition != null) {
-            hdfsTable.dropPartition(partitionSpec);
-            hdfsTable.setCatalogVersion(newCatalogVersion);
-          }
-          return hdfsTable;
-        } catch (Exception e) {
-          throw new CatalogException("Error loading metadata for partition: "
-              + hdfsTable.getFullName() + " " + partitionName, e);
-        }
-        hdfsTable.reloadPartition(hdfsPartition, hmsPartition);
-      }
-      hdfsTable.setCatalogVersion(newCatalogVersion);
-      return hdfsTable;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/Column.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Column.java b/fe/src/main/java/com/cloudera/impala/catalog/Column.java
deleted file mode 100644
index b2d7416..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/Column.java
+++ /dev/null
@@ -1,132 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.TColumnStats;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Internal representation of column-related metadata.
- * Owned by Catalog instance.
- */
-public class Column {
-  private final static Logger LOG = LoggerFactory.getLogger(Column.class);
-
-  protected final String name_;
-  protected final Type type_;
-  protected final String comment_;
-  protected int position_;  // in table
-
-  protected final ColumnStats stats_;
-
-  public Column(String name, Type type, int position) {
-    this(name, type, null, position);
-  }
-
-  public Column(String name, Type type, String comment, int position) {
-    name_ = name;
-    type_ = type;
-    comment_ = comment;
-    position_ = position;
-    stats_ = new ColumnStats(type);
-  }
-
-  public String getComment() { return comment_; }
-  public String getName() { return name_; }
-  public Type getType() { return type_; }
-  public int getPosition() { return position_; }
-  public void setPosition(int position) { this.position_ = position; }
-  public ColumnStats getStats() { return stats_; }
-
-  public boolean updateStats(ColumnStatisticsData statsData) {
-    boolean statsDataCompatibleWithColType = stats_.update(type_, statsData);
-    LOG.debug("col stats: " + name_ + " #distinct=" + stats_.getNumDistinctValues());
-    return statsDataCompatibleWithColType;
-  }
-
-  public void updateStats(TColumnStats statsData) {
-    stats_.update(type_, statsData);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this.getClass())
-                  .add("name_", name_)
-                  .add("type_", type_)
-                  .add("comment_", comment_)
-                  .add("stats", stats_)
-                  .add("position_", position_).toString();
-  }
-
-  public static Column fromThrift(TColumn columnDesc) {
-    String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null;
-    Preconditions.checkState(columnDesc.isSetPosition());
-    int position = columnDesc.getPosition();
-    Column col;
-    if (columnDesc.isIs_hbase_column()) {
-      // HBase table column. The HBase column qualifier (column name) is not be set for
-      // the HBase row key, so it being set in the thrift struct is not a precondition.
-      Preconditions.checkState(columnDesc.isSetColumn_family());
-      Preconditions.checkState(columnDesc.isSetIs_binary());
-      col = new HBaseColumn(columnDesc.getColumnName(), columnDesc.getColumn_family(),
-          columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(),
-          Type.fromThrift(columnDesc.getColumnType()), comment, position);
-    } else if (columnDesc.isIs_kudu_column()) {
-      Preconditions.checkState(columnDesc.isSetIs_key());
-      Preconditions.checkState(columnDesc.isSetIs_nullable());
-      col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(),
-          columnDesc.isIs_nullable(),
-          Type.fromThrift(columnDesc.getColumnType()), comment, position);
-    } else {
-      // Hdfs table column.
-      col = new Column(columnDesc.getColumnName(),
-          Type.fromThrift(columnDesc.getColumnType()), comment, position);
-    }
-    if (columnDesc.isSetCol_stats()) col.updateStats(columnDesc.getCol_stats());
-    return col;
-  }
-
-  public TColumn toThrift() {
-    TColumn colDesc = new TColumn(name_, type_.toThrift());
-    if (comment_ != null) colDesc.setComment(comment_);
-    colDesc.setPosition(position_);
-    colDesc.setCol_stats(getStats().toThrift());
-    return colDesc;
-  }
-
-  public static List<FieldSchema> toFieldSchemas(List<Column> columns) {
-    return Lists.transform(columns, new Function<Column, FieldSchema>() {
-      public FieldSchema apply(Column column) {
-        Preconditions.checkNotNull(column.getType());
-        return new FieldSchema(column.getName(), column.getType().toSql(),
-            column.getComment());
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java b/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java
deleted file mode 100644
index 4ea47c1..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-
-/**
- * Thrown when a column cannot be found in the catalog.
- */
-public class ColumnNotFoundException extends CatalogException {
-  // Dummy serial UID to avoid Eclipse warnings
-  private static final long serialVersionUID = -2203080667446640542L;
-
-  public ColumnNotFoundException(String s) { super(s); }
-
-  public ColumnNotFoundException(String s, Exception cause) { super(s, cause); }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java b/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java
deleted file mode 100644
index 8f8e4b3..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java
+++ /dev/null
@@ -1,334 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.Set;
-
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.thrift.TColumnStats;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
-/**
- * Statistics for a single column.
- */
-public class ColumnStats {
-  private final static Logger LOG = LoggerFactory.getLogger(ColumnStats.class);
-
-  // Set of the currently supported column stats column types.
-  private final static Set<PrimitiveType> SUPPORTED_COL_TYPES = Sets.newHashSet(
-      PrimitiveType.BIGINT, PrimitiveType.BINARY, PrimitiveType.BOOLEAN,
-      PrimitiveType.DOUBLE, PrimitiveType.FLOAT, PrimitiveType.INT,
-      PrimitiveType.SMALLINT, PrimitiveType.CHAR, PrimitiveType.VARCHAR,
-      PrimitiveType.STRING, PrimitiveType.TIMESTAMP, PrimitiveType.TINYINT,
-      PrimitiveType.DECIMAL);
-
-  public enum StatsKey {
-    NUM_DISTINCT_VALUES("numDVs"),
-    NUM_NULLS("numNulls"),
-    AVG_SIZE("avgSize"),
-    MAX_SIZE("maxSize");
-
-    private final String name_;
-
-    private StatsKey(String name) { name_ = name; }
-
-    /**
-     * Returns the StatsKey whose name matches 'key'. The comparison is
-     * case insensitive. Returns null if there is no matching StatsKey.
-     */
-    public static StatsKey fromString(String key) {
-      for (StatsKey k: values()) {
-        if (key.equalsIgnoreCase(k.name_)) return k;
-      }
-      return null;
-    }
-
-    @Override
-    public String toString() { return name_; }
-  }
-
-  // in bytes: excludes serialization overhead
-  private double avgSize_;
-  // in bytes; includes serialization overhead.
-  private double avgSerializedSize_;
-  private long maxSize_;  // in bytes
-  private long numDistinctValues_;
-  private long numNulls_;
-
-  public ColumnStats(Type colType) {
-    initColStats(colType);
-  }
-
-  /**
-   * C'tor for clone().
-   */
-  private ColumnStats(ColumnStats other) {
-    avgSize_ = other.avgSize_;
-    avgSerializedSize_ = other.avgSerializedSize_;
-    maxSize_ = other.maxSize_;
-    numDistinctValues_ = other.numDistinctValues_;
-    numNulls_ = other.numNulls_;
-  }
-
-  /**
-   * Initializes all column stats values as "unknown". For fixed-length type
-   * (those which don't need additional storage besides the slot they occupy),
-   * sets avgSerializedSize and maxSize to their slot size.
-   */
-  private void initColStats(Type colType) {
-    avgSize_ = -1;
-    avgSerializedSize_ = -1;
-    maxSize_ = -1;
-    numDistinctValues_ = -1;
-    numNulls_ = -1;
-    if (colType.isFixedLengthType()) {
-      avgSerializedSize_ = colType.getSlotSize();
-      avgSize_ = colType.getSlotSize();
-      maxSize_ = colType.getSlotSize();
-    }
-  }
-
-  /**
-   * Creates ColumnStats from the given expr. Sets numDistinctValues and if the expr
-   * is a SlotRef also numNulls.
-   */
-  public static ColumnStats fromExpr(Expr expr) {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkState(expr.getType().isValid());
-    ColumnStats stats = new ColumnStats(expr.getType());
-    stats.setNumDistinctValues(expr.getNumDistinctValues());
-    SlotRef slotRef = expr.unwrapSlotRef(false);
-    if (slotRef == null) return stats;
-    ColumnStats slotStats = slotRef.getDesc().getStats();
-    if (slotStats == null) return stats;
-    stats.numNulls_ = slotStats.getNumNulls();
-    stats.avgSerializedSize_ = slotStats.getAvgSerializedSize();
-    stats.avgSize_ = slotStats.getAvgSize();
-    stats.maxSize_ = slotStats.getMaxSize();
-    return stats;
-  }
-
-  /**
-   * Adds other's numDistinctValues and numNulls to this ColumnStats.
-   * If this or other's stats are invalid, sets the corresponding stat to invalid,
-   * Returns this with the updated stats.
-   * This method is used to aggregate stats for slots that originate from multiple
-   * source slots, e.g., those produced by union queries.
-   */
-  public ColumnStats add(ColumnStats other) {
-    if (numDistinctValues_ == -1 || other.numDistinctValues_ == -1) {
-      numDistinctValues_ = -1;
-    } else {
-      numDistinctValues_ += other.numDistinctValues_;
-    }
-    if (numNulls_ == -1 || other.numNulls_ == -1) {
-      numNulls_ = -1;
-    } else {
-      numNulls_ += other.numNulls_;
-    }
-    return this;
-  }
-
-  public void setAvgSize(float avgSize) { avgSize_ = avgSize; }
-  public void setAvgSerializedSize(float avgSize) { avgSerializedSize_ = avgSize; }
-  public void setMaxSize(long maxSize) { maxSize_ = maxSize; }
-  public long getNumDistinctValues() { return numDistinctValues_; }
-  public void setNumDistinctValues(long numDistinctValues) {
-    this.numDistinctValues_ = numDistinctValues;
-  }
-  public void setNumNulls(long numNulls) { numNulls_ = numNulls; }
-  public double getAvgSerializedSize() { return avgSerializedSize_; }
-  public double getAvgSize() { return avgSize_; }
-  public long getMaxSize() { return maxSize_; }
-  public boolean hasNulls() { return numNulls_ > 0; }
-  public long getNumNulls() { return numNulls_; }
-  public boolean hasAvgSerializedSize() { return avgSerializedSize_ >= 0; }
-  public boolean hasMaxSize() { return maxSize_ >= 0; }
-  public boolean hasNumDistinctValues() { return numDistinctValues_ >= 0; }
-  public boolean hasStats() { return numNulls_ != -1 || numDistinctValues_ != -1; }
-
-  /**
-   * Updates the stats with the given ColumnStatisticsData. If the ColumnStatisticsData
-   * is not compatible with the given colType, all stats are initialized based on
-   * initColStats().
-   * Returns false if the ColumnStatisticsData data was incompatible with the given
-   * column type, otherwise returns true.
-   */
-  public boolean update(Type colType, ColumnStatisticsData statsData) {
-    Preconditions.checkState(isSupportedColType(colType));
-    initColStats(colType);
-    boolean isCompatible = false;
-    switch (colType.getPrimitiveType()) {
-      case BOOLEAN:
-        isCompatible = statsData.isSetBooleanStats();
-        if (isCompatible) {
-          BooleanColumnStatsData boolStats = statsData.getBooleanStats();
-          numNulls_ = boolStats.getNumNulls();
-          numDistinctValues_ = (numNulls_ > 0) ? 3 : 2;
-        }
-        break;
-      case TINYINT:
-      case SMALLINT:
-      case INT:
-      case BIGINT:
-      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
-        isCompatible = statsData.isSetLongStats();
-        if (isCompatible) {
-          LongColumnStatsData longStats = statsData.getLongStats();
-          numDistinctValues_ = longStats.getNumDVs();
-          numNulls_ = longStats.getNumNulls();
-        }
-        break;
-      case FLOAT:
-      case DOUBLE:
-        isCompatible = statsData.isSetDoubleStats();
-        if (isCompatible) {
-          DoubleColumnStatsData doubleStats = statsData.getDoubleStats();
-          numDistinctValues_ = doubleStats.getNumDVs();
-          numNulls_ = doubleStats.getNumNulls();
-        }
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        isCompatible = statsData.isSetStringStats();
-        if (isCompatible) {
-          StringColumnStatsData stringStats = statsData.getStringStats();
-          numDistinctValues_ = stringStats.getNumDVs();
-          numNulls_ = stringStats.getNumNulls();
-          maxSize_ = stringStats.getMaxColLen();
-          avgSize_ = Double.valueOf(stringStats.getAvgColLen()).floatValue();
-          avgSerializedSize_ = avgSize_ + PrimitiveType.STRING.getSlotSize();
-        }
-        break;
-      case BINARY:
-        isCompatible = statsData.isSetStringStats();
-        if (isCompatible) {
-          BinaryColumnStatsData binaryStats = statsData.getBinaryStats();
-          numNulls_ = binaryStats.getNumNulls();
-          maxSize_ = binaryStats.getMaxColLen();
-          avgSize_ = Double.valueOf(binaryStats.getAvgColLen()).floatValue();
-          avgSerializedSize_ = avgSize_ + PrimitiveType.BINARY.getSlotSize();
-        }
-        break;
-      case DECIMAL:
-        isCompatible = statsData.isSetDecimalStats();
-        if (isCompatible) {
-          DecimalColumnStatsData decimalStats = statsData.getDecimalStats();
-          numNulls_ = decimalStats.getNumNulls();
-          numDistinctValues_ = decimalStats.getNumDVs();
-        }
-        break;
-      default:
-        Preconditions.checkState(false,
-            "Unexpected column type: " + colType.toString());
-        break;
-    }
-    return isCompatible;
-  }
-
-  /**
-   * Sets the member corresponding to the given stats key to 'value'.
-   * Requires that the given value is of a type appropriate for the
-   * member being set. Throws if that is not the case.
-   */
-  public void update(StatsKey key, Number value) {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-    if (key == StatsKey.AVG_SIZE) {
-      Preconditions.checkArgument(value instanceof Float);
-    } else {
-      Preconditions.checkArgument(value instanceof Long);
-    }
-    switch (key) {
-      case NUM_DISTINCT_VALUES: {
-        numDistinctValues_ = (Long) value;
-        break;
-      }
-      case NUM_NULLS: {
-        numNulls_ = (Long) value;
-        break;
-      }
-      case AVG_SIZE: {
-        avgSize_ = (Float) value;
-        break;
-      }
-      case MAX_SIZE: {
-        maxSize_ = (Long) value;
-        break;
-      }
-      default: Preconditions.checkState(false);
-    }
-  }
-
-  /**
-   * Returns true if the given PrimitiveType supports column stats updates.
-   */
-  public static boolean isSupportedColType(Type colType) {
-    if (!colType.isScalarType()) return false;
-    ScalarType scalarType = (ScalarType) colType;
-    return SUPPORTED_COL_TYPES.contains(scalarType.getPrimitiveType());
-  }
-
-  public void update(Type colType, TColumnStats stats) {
-    initColStats(colType);
-    avgSize_ = Double.valueOf(stats.getAvg_size()).floatValue();
-    if (colType.getPrimitiveType() == PrimitiveType.STRING ||
-        colType.getPrimitiveType() == PrimitiveType.BINARY) {
-      avgSerializedSize_ = colType.getSlotSize() + avgSize_;
-    }
-    maxSize_ = stats.getMax_size();
-    numDistinctValues_ = stats.getNum_distinct_values();
-    numNulls_ = stats.getNum_nulls();
-  }
-
-  public TColumnStats toThrift() {
-    TColumnStats colStats = new TColumnStats();
-    colStats.setAvg_size(avgSize_);
-    colStats.setMax_size(maxSize_);
-    colStats.setNum_distinct_values(numDistinctValues_);
-    colStats.setNum_nulls(numNulls_);
-    return colStats;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this.getClass())
-        .add("avgSerializedSize_", avgSerializedSize_)
-        .add("maxSize_", maxSize_)
-        .add("numDistinct_", numDistinctValues_)
-        .add("numNulls_", numNulls_)
-        .toString();
-  }
-
-  @Override
-  public ColumnStats clone() { return new ColumnStats(this); }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java
deleted file mode 100644
index ed0d9ee..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TDataSource;
-import com.google.common.base.Objects;
-
-/**
- * Represents a data source in the catalog. Contains the data source name and all
- * information needed to locate and load the data source.
- */
-public class DataSource implements CatalogObject {
-  private final String dataSrcName_;
-  private final String className_;
-  private final String apiVersionString_;
-  // Qualified path to the data source.
-  private final String location_;
-  private long catalogVersion_ =  Catalog.INITIAL_CATALOG_VERSION;
-
-  public DataSource(String dataSrcName, String location, String className,
-      String apiVersionString) {
-    dataSrcName_ = dataSrcName;
-    location_ = location;
-    className_ = className;
-    apiVersionString_ = apiVersionString;
-  }
-
-  public static DataSource fromThrift(TDataSource thrift) {
-    return new DataSource(thrift.getName(), thrift.getHdfs_location(),
-        thrift.getClass_name(), thrift.getApi_version());
-  }
-
-  @Override
-  public TCatalogObjectType getCatalogObjectType() {
-    return TCatalogObjectType.DATA_SOURCE;
-  }
-
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
-  @Override
-  public String getName() { return dataSrcName_; }
-
-  @Override
-  public boolean isLoaded() { return true; }
-
-  public String getLocation() { return location_; }
-  public String getClassName() { return className_; }
-  public String getApiVersion() { return apiVersionString_; }
-
-  public TDataSource toThrift() {
-    return new TDataSource(getName(), location_, className_, apiVersionString_);
-  }
-
-  public String debugString() {
-    return Objects.toStringHelper(this)
-        .add("name", dataSrcName_)
-        .add("location", location_)
-        .add("className", className_)
-        .add("apiVersion", apiVersionString_)
-        .toString();
-  }
-
-  public static String debugString(TDataSource thrift) {
-    return fromThrift(thrift).debugString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
deleted file mode 100644
index c42c804..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
+++ /dev/null
@@ -1,259 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.extdatasource.v1.ExternalDataSource;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.TDataSource;
-import com.cloudera.impala.thrift.TDataSourceTable;
-import com.cloudera.impala.thrift.TResultSet;
-import com.cloudera.impala.thrift.TResultSetMetadata;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.thrift.TTableDescriptor;
-import com.cloudera.impala.thrift.TTableType;
-import com.cloudera.impala.util.TResultRowBuilder;
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a table backed by an external data source. All data source properties are
- * stored as table properties (persisted in the metastore) because the DataSource catalog
- * object is not persisted so the DataSource catalog object will not exist if the catalog
- * server is restarted, but the table does not need the DataSource catalog object in
- * order to scan the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table
- * parameter are assumed to be backed by an external data source.
- */
-public class DataSourceTable extends Table {
-  private final static Logger LOG = LoggerFactory.getLogger(DataSourceTable.class);
-
-  /**
-   * Table property key for the data source name.
-   */
-  public static final String TBL_PROP_DATA_SRC_NAME = "__IMPALA_DATA_SOURCE_NAME";
-
-  /**
-   * Table property key for the table init string.
-   */
-  public static final String TBL_PROP_INIT_STRING = "__IMPALA_DATA_SOURCE_INIT_STRING";
-
-  /**
-   * Table property key for the data source library HDFS path.
-   */
-  public static final String TBL_PROP_LOCATION = "__IMPALA_DATA_SOURCE_LOCATION";
-
-  /**
-   * Table property key for the class implementing {@link ExternalDataSource}.
-   */
-  public static final String TBL_PROP_CLASS = "__IMPALA_DATA_SOURCE_CLASS";
-
-  /**
-   * Table property key for the API version implemented by the data source.
-   */
-  public static final String TBL_PROP_API_VER = "__IMPALA_DATA_SOURCE_API_VERSION";
-
-  private String initString_;
-  private TDataSource dataSource_;
-
-  protected DataSourceTable(
-      TableId id, org.apache.hadoop.hive.metastore.api.Table msTable,
-      Db db, String name, String owner) {
-    super(id, msTable, db, name, owner);
-  }
-
-  /**
-   * Gets the the data source.
-   */
-  public TDataSource getDataSource() { return dataSource_; }
-
-  /**
-   * Gets the table init string passed to the data source.
-   */
-  public String getInitString() { return initString_; }
-
-  public int getNumNodes() { return 1; }
-
-  @Override
-  public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
-
-  /**
-   * Returns true if the column type is supported.
-   */
-  public static boolean isSupportedColumnType(Type colType) {
-    Preconditions.checkNotNull(colType);
-    return isSupportedPrimitiveType(colType.getPrimitiveType());
-  }
-
-  /**
-   * Returns true if the primitive type is supported.
-   */
-  public static boolean isSupportedPrimitiveType(PrimitiveType primitiveType) {
-    Preconditions.checkNotNull(primitiveType);
-    switch (primitiveType) {
-      case BIGINT:
-      case INT:
-      case SMALLINT:
-      case TINYINT:
-      case DOUBLE:
-      case FLOAT:
-      case BOOLEAN:
-      case STRING:
-      case TIMESTAMP:
-      case DECIMAL:
-        return true;
-      case BINARY:
-      case CHAR:
-      case DATE:
-      case DATETIME:
-      case INVALID_TYPE:
-      case NULL_TYPE:
-      default:
-        return false;
-    }
-  }
-
-  /**
-   * Create columns corresponding to fieldSchemas.
-   * Throws a TableLoadingException if the metadata is incompatible with what we
-   * support.
-   */
-  private void loadColumns(List<FieldSchema> fieldSchemas, IMetaStoreClient client)
-      throws TableLoadingException {
-    int pos = 0;
-    for (FieldSchema s: fieldSchemas) {
-      Column col = new Column(s.getName(), parseColumnType(s), s.getComment(), pos);
-      Preconditions.checkArgument(isSupportedColumnType(col.getType()));
-      addColumn(col);
-      ++pos;
-    }
-  }
-
-  @Override
-  protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
-    super.loadFromThrift(thriftTable);
-    TDataSourceTable dataSourceTable = thriftTable.getData_source_table();
-    initString_ = dataSourceTable.getInit_string();
-    dataSource_ = dataSourceTable.getData_source();
-  }
-
-  @Override
-  public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
-    Preconditions.checkNotNull(msTbl);
-    msTable_ = msTbl;
-    clearColumns();
-    LOG.debug("load table: " + db_.getName() + "." + name_);
-    String dataSourceName = getRequiredTableProperty(msTbl, TBL_PROP_DATA_SRC_NAME, null);
-    String location = getRequiredTableProperty(msTbl, TBL_PROP_LOCATION, dataSourceName);
-    String className = getRequiredTableProperty(msTbl, TBL_PROP_CLASS, dataSourceName);
-    String apiVersionString = getRequiredTableProperty(msTbl, TBL_PROP_API_VER,
-        dataSourceName);
-    dataSource_ = new TDataSource(dataSourceName, location, className, apiVersionString);
-    initString_ = getRequiredTableProperty(msTbl, TBL_PROP_INIT_STRING, dataSourceName);
-
-    if (msTbl.getPartitionKeysSize() > 0) {
-      throw new TableLoadingException("Data source table cannot contain clustering " +
-          "columns: " + name_);
-    }
-    numClusteringCols_ = 0;
-
-    try {
-      // Create column objects.
-      List<FieldSchema> fieldSchemas = getMetaStoreTable().getSd().getCols();
-      loadColumns(fieldSchemas, client);
-
-      // Set table stats.
-      numRows_ = getRowCount(super.getMetaStoreTable().getParameters());
-    } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for data source table: " +
-          name_, e);
-    }
-  }
-
-  private String getRequiredTableProperty(
-      org.apache.hadoop.hive.metastore.api.Table msTbl, String key, String dataSourceName)
-      throws TableLoadingException {
-    String val = msTbl.getParameters().get(key);
-    if (val == null) {
-      throw new TableLoadingException(String.format("Failed to load table %s produced " +
-          "by external data source %s. Missing required metadata: %s", name_,
-          dataSourceName == null ? "<unknown>" : dataSourceName, key));
-    }
-    return val;
-  }
-
-  /**
-   * Returns statistics on this table as a tabular result set. Used for the
-   * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
-   * inside this method.
-   */
-  public TResultSet getTableStats() {
-    TResultSet result = new TResultSet();
-    TResultSetMetadata resultSchema = new TResultSetMetadata();
-    resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
-    result.setSchema(resultSchema);
-    TResultRowBuilder rowBuilder = new TResultRowBuilder();
-    rowBuilder.add(numRows_);
-    result.addToRows(rowBuilder.get());
-    return result;
-  }
-
-  @Override
-  public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
-    TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(),
-        TTableType.DATA_SOURCE_TABLE, getTColumnDescriptors(), numClusteringCols_,
-        name_, db_.getName());
-    tableDesc.setDataSourceTable(getDataSourceTable());
-    return tableDesc;
-  }
-
-  /**
-   * Returns a thrift structure representing the table.
-   */
-  @Override
-  public TTable toThrift() {
-    TTable table = super.toThrift();
-    table.setTable_type(TTableType.DATA_SOURCE_TABLE);
-    table.setData_source_table(getDataSourceTable());
-    return table;
-  }
-
-  /**
-   * Returns a thrift {@link TDataSourceTable} structure for the data source table.
-   */
-  private TDataSourceTable getDataSourceTable() {
-    return new TDataSourceTable(dataSource_, initString_);
-  }
-
-  /**
-   * True if the Hive {@link org.apache.hadoop.hive.metastore.api.Table} is a
-   * data source table by checking for the existance of the
-   * TBL_PROP_DATA_SRC_NAME table property.
-   */
-  public static boolean isDataSourceTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    return msTbl.getParameters().containsKey(TBL_PROP_DATA_SRC_NAME);
-  }
-}