You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/19 18:29:05 UTC

[impala] 08/08: IMPALA-7137. Support configuring Frontend to use LocalCatalog

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 800ac7d9f4dbceeb87bbc9e4a464a29fa34f99a4
Author: Todd Lipcon <to...@cloudera.com>
AuthorDate: Wed Jun 6 16:17:49 2018 -0700

    IMPALA-7137. Support configuring Frontend to use LocalCatalog
    
    This adds a new flag -use_local_catalog which is passed through to the
    frontend and causes it to use LocalCatalog instead of ImpaladCatalog.
    Additionally, when this flag is configured, the impalad does not
    subscribe to catalog topic updates from the statestore.
    
    The patch is slightly more complex than simply picking which class to
    instantiate, because the lifecycle is designed a bit differently between
    the two implementations:
    
    - LocalCatalog is instantiated once per query/request.
    
    - ImpaladCatalog is instantiated once and stateful across queries,
      except when a full catalog update is received. This maintains the
      current behavior for this implementation.
    
    In order to abstract this difference in lifecycle from the frontend, I
    introduced a new FeCatalogManager class with different implementations
    for the two lifecycles. I also had to add a simple test implementation
    since some tests rely on directly injecting a Catalog implementation
    into the Frontend.
    
    This patch also includes a few small changes to the local catalog
    implementation objects to enable the impalad to start and accept
    connections. With this patch, I was able to manually test as follows:
    
    I started just the statestore and the impalad in the new mode:
    
    - ./bin/start-statestored.sh
    - ./bin/start-impalad.sh --use_local_catalog
    
    I connected with impala-shell as usual and was able to run the most
    simple queries:
    
    - SHOW DATABASES;
    - USE functional;
    - SHOW TABLES;
    
    All other functionality results in error messages due to the various
    TODOs in the current skeleton implementation.
    
    Change-Id: I8c9665bd031d23608740b23eef301970af9aa764
    Reviewed-on: http://gerrit.cloudera.org:8080/10629
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
---
 be/src/runtime/exec-env.cc                         |   4 +
 be/src/service/impala-server.cc                    |   3 +-
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../apache/impala/catalog/local/LocalCatalog.java  |   7 +-
 .../org/apache/impala/catalog/local/LocalDb.java   |   4 +-
 .../impala/service/DescribeResultFactory.java      |  12 +-
 .../apache/impala/service/FeCatalogManager.java    | 157 +++++++++++++++++++++
 .../java/org/apache/impala/service/Frontend.java   | 104 +++++++-------
 .../org/apache/impala/service/JniFrontend.java     |   2 +-
 .../java/org/apache/impala/service/MetadataOp.java |   8 +-
 11 files changed, 241 insertions(+), 64 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1a38bc7..1322bd8 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -84,6 +84,10 @@ DEFINE_bool(disable_admission_control, false, "Disables admission control.");
 DEFINE_bool(use_krpc, true, "If true, use KRPC for the DataStream subsystem. "
     "Otherwise use Thrift RPC.");
 
+DEFINE_bool_hidden(use_local_catalog, false,
+  "Use experimental implementation of a local catalog. If this is set, "
+  "the catalog service is not used and does not need to be started.");
+
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 2026842..ce6c2c3 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -119,6 +119,7 @@ DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
 DECLARE_bool(use_krpc);
+DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served");
 DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served");
@@ -364,7 +365,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     ABORT_IF_ERROR(
         exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb));
 
-    if (FLAGS_is_coordinator) {
+    if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
           vector<TTopicDelta>* topic_updates) {
         this->CatalogUpdateCallback(state, topic_updates);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index acbdc6f..d8d945a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -28,6 +28,7 @@ DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
 DECLARE_bool(enable_orc_scanner);
+DECLARE_bool(use_local_catalog);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(max_hdfs_partitions_parallel_load);
@@ -60,6 +61,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_authorization_policy_file(FLAGS_authorization_policy_file);
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
   cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner);
+  cfg.__set_use_local_catalog(FLAGS_use_local_catalog);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index f645e3c..c8c0dea 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -77,4 +77,6 @@ struct TBackendGflags {
   25: required bool enable_orc_scanner
 
   26: required string authorized_proxy_group_config
+
+  27: required bool use_local_catalog
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index beac563..2b246e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -68,6 +68,11 @@ public class LocalCatalog implements FeCatalog {
   private Map<String, FeDb> dbs_ = Maps.newHashMap();
   private static final Db builtinsDb_ = new BuiltinsDb(ImpaladCatalog.BUILTINS_DB);
 
+  public static FeCatalog create(String defaultKuduMasterHosts) {
+    // TODO(todd): store the kudu master hosts
+    return new LocalCatalog(new DirectMetaProvider());
+  }
+
   public LocalCatalog(MetaProvider metaProvider) {
     metaProvider_ = Preconditions.checkNotNull(metaProvider);
   }
@@ -187,7 +192,7 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public AuthorizationPolicy getAuthPolicy() {
-    throw new UnsupportedOperationException("TODO");
+    return null; // TODO(todd): implement auth policy
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 1bbe9b8..b3bf010 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -168,6 +168,8 @@ class LocalDb implements FeDb {
 
   @Override
   public TDatabase toThrift() {
-    throw new UnsupportedOperationException("TODO");
+    TDatabase tdb = new TDatabase(name_);
+    tdb.setMetastore_db(getMetaStoreDb());
+    return tdb;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
index 9a5adf4..d1081c6 100644
--- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
+++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
 import org.apache.impala.catalog.Column;
-import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
@@ -51,7 +51,7 @@ public class DescribeResultFactory {
   // Empty column used to format description output table.
   private final static TColumnValue EMPTY = new TColumnValue().setString_val("");
 
-  public static TDescribeResult buildDescribeDbResult(Db db,
+  public static TDescribeResult buildDescribeDbResult(FeDb db,
     TDescribeOutputStyle outputFormat) {
     switch (outputFormat) {
       case MINIMAL: return describeDbMinimal(db);
@@ -67,7 +67,7 @@ public class DescribeResultFactory {
    * Builds results for a DESCRIBE DATABASE <db> command. This consists of the database
    * location and comment.
    */
-  private static TDescribeResult describeDbMinimal(Db db) {
+  private static TDescribeResult describeDbMinimal(FeDb db) {
     TDescribeResult descResult = new TDescribeResult();
 
     org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
@@ -122,7 +122,7 @@ public class DescribeResultFactory {
    * Builds a TDescribeResult that contains the result of a DESCRIBE FORMATTED|EXTENDED
    * DATABASE <db> command. Output all the database's properties.
    */
-  private static TDescribeResult describeDbExtended(Db db) {
+  private static TDescribeResult describeDbExtended(FeDb db) {
     TDescribeResult descResult = describeDbMinimal(db);
     org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
     String ownerName = null;
@@ -184,7 +184,7 @@ public class DescribeResultFactory {
    * Hive's MetadataFormatUtils class is used to build the results.  filteredColumns is a
    * list of columns the user is authorized to view.
    */
-  public static TDescribeResult buildDescribeFormattedResult(Table table,
+  public static TDescribeResult buildDescribeFormattedResult(FeTable table,
       List<Column> filteredColumns) {
     TDescribeResult result = new TDescribeResult();
     result.results = Lists.newArrayList();
diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
new file mode 100644
index 0000000..2bd44a4
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.catalog.ImpaladCatalog;
+import org.apache.impala.catalog.local.LocalCatalog;
+import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
+import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.thrift.TException;
+
+/**
+ * Manages the Catalog implementation used by the frontend.
+ *
+ * This class abstracts away the different lifecycles used by the LocalCatalog
+ * and the ImpaladCatalog. The former creates a new instance for each request or
+ * query, whereas the latter only creates a new instance upon receiving a full update
+ * from the catalogd via the statestore.
+ */
+public abstract class FeCatalogManager {
+  private static String DEFAULT_KUDU_MASTER_HOSTS =
+      BackendConfig.INSTANCE.getBackendCfg().kudu_master_hosts;
+
+  /**
+   * @return the appropriate implementation based on the current backend
+   * configuration.
+   */
+  public static FeCatalogManager createFromBackendConfig() {
+    if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
+      return new LocalImpl();
+    } else {
+      return new CatalogdImpl();
+    }
+  }
+
+  /**
+   * Create a manager which always returns the same instance and does not permit
+   * updates from the statestore.
+   */
+  public static FeCatalogManager createForTests(FeCatalog testCatalog) {
+    return new TestImpl(testCatalog);
+  }
+
+  /**
+   * @return a Catalog instance to be used for a request or query. Depending
+   * on the catalog implementation this may either be a reused instance or a
+   * fresh one for each query.
+   */
+  abstract FeCatalog getOrCreateCatalog();
+
+  /**
+   * Update the Catalog based on an update from the state store. Only supported
+   * by the catalogd-based implementation.
+   */
+  abstract TUpdateCatalogCacheResponse updateCatalogCache(
+      TUpdateCatalogCacheRequest req) throws CatalogException, TException;
+
+  /**
+   * Implementation which creates ImpaladCatalog instances and expects to receive
+   * updates via the statestore. New instances are created only when full updates
+   * are received.
+   */
+  private static class CatalogdImpl extends FeCatalogManager {
+    private final AtomicReference<ImpaladCatalog> catalog_ =
+        new AtomicReference<>();
+
+    private CatalogdImpl() {
+      catalog_.set(createNewCatalog());
+    }
+
+    @Override
+    FeCatalog getOrCreateCatalog() {
+      return catalog_.get();
+    }
+
+    @Override
+    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req)
+        throws CatalogException, TException {
+      ImpaladCatalog catalog = catalog_.get();
+      if (req.is_delta) return catalog.updateCatalog(req);
+
+      // If this is not a delta, this update should replace the current
+      // Catalog contents so create a new catalog and populate it.
+      catalog = createNewCatalog();
+
+      TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
+
+      // Now that the catalog has been updated, replace the reference to
+      // catalog_. This ensures that clients don't see the catalog
+      // disappear. The catalog is guaranteed to be ready since updateCatalog() has a
+      // postcondition of isReady() == true.
+      catalog_.set(catalog);
+      return response;
+    }
+
+    private ImpaladCatalog createNewCatalog() {
+      return new ImpaladCatalog(DEFAULT_KUDU_MASTER_HOSTS);
+    }
+  }
+
+  /**
+   * Implementation which creates LocalCatalog instances. A new instance is
+   * created for each request or query.
+   */
+  private static class LocalImpl extends FeCatalogManager {
+    @Override
+    FeCatalog getOrCreateCatalog() {
+      return LocalCatalog.create(DEFAULT_KUDU_MASTER_HOSTS);
+    }
+
+    @Override
+    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
+      throw new IllegalStateException(
+          "Unexpected call to updateCatalogCache() with local catalog enabled");
+    }
+  }
+
+  /**
+   * Implementation which returns a provided catalog instance, used by tests.
+   * No updates from the statestore are permitted.
+   */
+  private static class TestImpl extends FeCatalogManager {
+    private final FeCatalog catalog_;
+
+    TestImpl(FeCatalog catalog) {
+      catalog_ = catalog;
+    }
+
+    @Override
+    FeCatalog getOrCreateCatalog() {
+      return catalog_;
+    }
+
+    @Override
+    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
+      throw new IllegalStateException(
+          "Unexpected call to updateCatalogCache() with a test catalog instance");
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index ad51286..ab330fd 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -76,17 +76,16 @@ import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.DatabaseNotFoundException;
-import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDataSource;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HBaseTable;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.KuduTable;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
@@ -146,6 +145,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
@@ -163,39 +163,53 @@ public class Frontend {
   // Max time to wait for a catalog update notification.
   public static final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000;
 
-  //TODO: Make the reload interval configurable.
+  // TODO: Make the reload interval configurable.
   private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
 
-  private final AtomicReference<ImpaladCatalog> impaladCatalog_ =
-      new AtomicReference<ImpaladCatalog>();
+  private final FeCatalogManager catalogManager_;
   private final AuthorizationConfig authzConfig_;
-  private final AtomicReference<AuthorizationChecker> authzChecker_;
+  /**
+   * Authorization checker. Initialized and periodically loaded by a task
+   * running on the {@link #policyReader_} thread.
+   */
+  private final AtomicReference<AuthorizationChecker> authzChecker_ =
+      new AtomicReference<>();
   private final ScheduledExecutorService policyReader_ =
       Executors.newScheduledThreadPool(1);
-  private final String defaultKuduMasterHosts_;
 
-  public Frontend(AuthorizationConfig authorizationConfig,
-      String defaultKuduMasterHosts) {
-    this(authorizationConfig, new ImpaladCatalog(defaultKuduMasterHosts));
+  public Frontend(AuthorizationConfig authorizationConfig) {
+    this(authorizationConfig, FeCatalogManager.createFromBackendConfig());
   }
 
   /**
-   * C'tor used by tests to pass in a custom ImpaladCatalog.
+   * Create a frontend with a specific catalog instance which will not allow
+   * updates and will be used for all requests.
    */
-  public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) {
+  @VisibleForTesting
+  public Frontend(AuthorizationConfig authorizationConfig,
+      FeCatalog testCatalog) {
+    this(authorizationConfig, FeCatalogManager.createForTests(testCatalog));
+  }
+
+  private Frontend(AuthorizationConfig authorizationConfig,
+      FeCatalogManager catalogManager) {
     authzConfig_ = authorizationConfig;
-    impaladCatalog_.set(catalog);
-    defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts();
-    authzChecker_ = new AtomicReference<AuthorizationChecker>(
-        new AuthorizationChecker(authzConfig_, impaladCatalog_.get().getAuthPolicy()));
+    catalogManager_ = catalogManager;
+
+    // Load the authorization policy once at startup, initializing
+    // authzChecker_. This ensures that, if the policy fails to load,
+    // we will throw an exception and fail to start.
+    AuthorizationPolicyReader policyReaderTask =
+        new AuthorizationPolicyReader(authzConfig_);
+    policyReaderTask.run();
+
     // If authorization is enabled, reload the policy on a regular basis.
     if (authzConfig_.isEnabled() && authzConfig_.isFileBasedPolicy()) {
       // Stagger the reads across nodes
       Random randomGen = new Random(UUID.randomUUID().hashCode());
       int delay = AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS + randomGen.nextInt(60);
 
-      policyReader_.scheduleAtFixedRate(
-          new AuthorizationPolicyReader(authzConfig_),
+      policyReader_.scheduleAtFixedRate(policyReaderTask,
           delay, AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS, TimeUnit.SECONDS);
     }
   }
@@ -222,26 +236,16 @@ public class Frontend {
     }
   }
 
-  public ImpaladCatalog getCatalog() { return impaladCatalog_.get(); }
+  public FeCatalog getCatalog() { return catalogManager_.getOrCreateCatalog(); }
+
   public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
 
   public TUpdateCatalogCacheResponse updateCatalogCache(
       TUpdateCatalogCacheRequest req) throws CatalogException, TException {
-    if (req.is_delta) return impaladCatalog_.get().updateCatalog(req);
-
-    // If this is not a delta, this update should replace the current
-    // Catalog contents so create a new catalog and populate it.
-    ImpaladCatalog catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
-
-    TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
-
-    // Now that the catalog has been updated, replace the references to
-    // impaladCatalog_/authzChecker_. This ensures that clients don't see the catalog
-    // disappear. The catalog is guaranteed to be ready since updateCatalog() has a
-    // postcondition of isReady() == true.
-    impaladCatalog_.set(catalog);
-    authzChecker_.set(new AuthorizationChecker(authzConfig_, catalog.getAuthPolicy()));
-    return response;
+    TUpdateCatalogCacheResponse resp = catalogManager_.updateCatalogCache(req);
+    authzChecker_.set(new AuthorizationChecker(
+        authzConfig_, getCatalog().getAuthPolicy()));
+    return resp;
   }
 
   /**
@@ -555,7 +559,7 @@ public class Frontend {
     // Get the destination for the load. If the load is targeting a partition,
     // this the partition location. Otherwise this is the table location.
     String destPathString = null;
-    ImpaladCatalog catalog = impaladCatalog_.get();
+    FeCatalog catalog = getCatalog();
     if (request.isSetPartition_spec()) {
       destPathString = catalog.getHdfsPartition(tableName.getDb(),
           tableName.getTbl(), request.getPartition_spec()).getLocation();
@@ -616,7 +620,7 @@ public class Frontend {
    */
   public List<String> getTableNames(String dbName, PatternMatcher matcher,
       User user) throws ImpalaException {
-    List<String> tblNames = impaladCatalog_.get().getTableNames(dbName, matcher);
+    List<String> tblNames = getCatalog().getTableNames(dbName, matcher);
     if (authzConfig_.isEnabled()) {
       Iterator<String> iter = tblNames.iterator();
       while (iter.hasNext()) {
@@ -635,7 +639,7 @@ public class Frontend {
    * Returns a list of columns of a table using 'matcher' and are accessible
    * to the given user.
    */
-  public List<Column> getColumns(Table table, PatternMatcher matcher,
+  public List<Column> getColumns(FeTable table, PatternMatcher matcher,
       User user) throws InternalException {
     Preconditions.checkNotNull(table);
     Preconditions.checkNotNull(matcher);
@@ -660,7 +664,7 @@ public class Frontend {
    */
   public List<? extends FeDb> getDbs(PatternMatcher matcher, User user)
       throws InternalException {
-    List<? extends FeDb> dbs = impaladCatalog_.get().getDbs(matcher);
+    List<? extends FeDb> dbs = getCatalog().getDbs(matcher);
     // If authorization is enabled, filter out the databases the user does not
     // have permissions on.
     if (authzConfig_.isEnabled()) {
@@ -692,7 +696,7 @@ public class Frontend {
    * matches all data sources.
    */
   public List<? extends FeDataSource> getDataSrcs(String pattern) {
-    return impaladCatalog_.get().getDataSources(
+    return getCatalog().getDataSources(
         PatternMatcher.createHivePatternMatcher(pattern));
   }
 
@@ -701,7 +705,7 @@ public class Frontend {
    */
   public TResultSet getColumnStats(String dbName, String tableName)
       throws ImpalaException {
-    Table table = impaladCatalog_.get().getTable(dbName, tableName);
+    FeTable table = getCatalog().getTable(dbName, tableName);
     TResultSet result = new TResultSet();
     TResultSetMetadata resultSchema = new TResultSetMetadata();
     result.setSchema(resultSchema);
@@ -729,7 +733,7 @@ public class Frontend {
    */
   public TResultSet getTableStats(String dbName, String tableName, TShowStatsOp op)
       throws ImpalaException {
-    Table table = impaladCatalog_.get().getTable(dbName, tableName);
+    FeTable table = getCatalog().getTable(dbName, tableName);
     if (table instanceof FeFsTable) {
       return ((FeFsTable) table).getTableStats();
     } else if (table instanceof HBaseTable) {
@@ -755,7 +759,7 @@ public class Frontend {
   public List<Function> getFunctions(TFunctionCategory category,
       String dbName, String fnPattern, boolean exactMatch)
       throws DatabaseNotFoundException {
-    Db db = impaladCatalog_.get().getDb(dbName);
+    FeDb db = getCatalog().getDb(dbName);
     if (db == null) {
       throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
     }
@@ -783,7 +787,7 @@ public class Frontend {
    */
   public TDescribeResult describeDb(String dbName, TDescribeOutputStyle outputStyle)
       throws ImpalaException {
-    Db db = impaladCatalog_.get().getDb(dbName);
+    FeDb db = getCatalog().getDb(dbName);
     return DescribeResultFactory.buildDescribeDbResult(db, outputStyle);
   }
 
@@ -794,7 +798,7 @@ public class Frontend {
    */
   public TDescribeResult describeTable(TTableName tableName,
       TDescribeOutputStyle outputStyle, User user) throws ImpalaException {
-    Table table = impaladCatalog_.get().getTable(tableName.db_name, tableName.table_name);
+    FeTable table = getCatalog().getTable(tableName.db_name, tableName.table_name);
     List<Column> filteredColumns;
     if (authzConfig_.isEnabled()) {
       // First run a table check
@@ -1120,14 +1124,14 @@ public class Frontend {
 
       // create finalization params of insert stmt
       InsertStmt insertStmt = analysisResult.getInsertStmt();
-      if (insertStmt.getTargetTable() instanceof HdfsTable) {
+      if (insertStmt.getTargetTable() instanceof FeFsTable) {
         TFinalizeParams finalizeParams = new TFinalizeParams();
         finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
         finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
         finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID);
         String db = insertStmt.getTargetTableName().getDb();
         finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
-        HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
+        FeFsTable hdfsTable = (FeFsTable) insertStmt.getTargetTable();
         finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
         finalizeParams.setStaging_dir(
             hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
@@ -1208,10 +1212,10 @@ public class Frontend {
    */
   public TResultSet getTableFiles(TShowFilesParams request)
       throws ImpalaException{
-    Table table = impaladCatalog_.get().getTable(request.getTable_name().getDb_name(),
+    FeTable table = getCatalog().getTable(request.getTable_name().getDb_name(),
         request.getTable_name().getTable_name());
-    if (table instanceof HdfsTable) {
-      return ((HdfsTable) table).getFiles(request.getPartition_set());
+    if (table instanceof FeFsTable) {
+      return ((FeFsTable) table).getFiles(request.getPartition_set());
     } else {
       throw new InternalException("SHOW FILES only supports Hdfs table. " +
           "Unsupported table class: " + table.getClass());
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index d8ef912..25dee1c 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -148,7 +148,7 @@ public class JniFrontend {
     }
     LOG.info(JniUtil.getJavaVersion());
 
-    frontend_ = new Frontend(authConfig, cfg.kudu_master_hosts);
+    frontend_ = new Frontend(authConfig);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/MetadataOp.java b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
index ea4864f..d221e2c 100644
--- a/fe/src/main/java/org/apache/impala/service/MetadataOp.java
+++ b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
@@ -28,12 +28,12 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
-import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TColumn;
@@ -266,7 +266,7 @@ public class MetadataOp {
       return result;
     }
 
-    ImpaladCatalog catalog = fe.getCatalog();
+    FeCatalog catalog = fe.getCatalog();
     for (FeDb db: fe.getDbs(schemaPatternMatcher, user)) {
       if (fnPatternMatcher != PatternMatcher.MATCHER_MATCH_NONE) {
         // Get function metadata
@@ -279,7 +279,7 @@ public class MetadataOp {
         List<String> tableComments = Lists.newArrayList();
         List<String> tableTypes = Lists.newArrayList();
         for (String tabName: fe.getTableNames(db.getName(), tablePatternMatcher, user)) {
-          Table table = catalog.getTable(db.getName(), tabName);
+          FeTable table = catalog.getTable(db.getName(), tabName);
           if (table == null) continue;
 
           String comment = null;