You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/24 22:48:28 UTC

[impala] 03/03: IMPALA-9609: Minimize Frontend activity in executor only impalads

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 58273fff601dcc763ac43f7cc275a174a2e18b6b
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Fri Apr 24 00:50:39 2020 +0200

    IMPALA-9609: Minimize Frontend activity in executor only impalads
    
    Until now the Frontend started fully regardless of flag is_coordinator,
    e.g. created connections to the HMS, which is both error prone and can
    DoS the metastore. (note that even coordinators started to connect to
    HMS only in the recent past, related to local catalog mode and ACID
    transactions)
    
    Executor only impalads still need a JVM as queries can contain
    java calls (HDFS/Hbase API calls, Hive UDFs), but most of the JNI API
    provided by JniFrontend shouldn't be called by executors. It seems
    that the whole Frontend object is needed only by coordinators.
    
    Testing:
    - generally executor only mode doesn't seem to be well covered
    - ran test_coordinators.py which has some tests with executor only
      impalads
    - added new test for HBase tables (Hive UDFs and HDFS were already
      covered)
    
    Change-Id: I4627e5e3520175153cb49e24fd480815dfefdae1
    Reviewed-on: http://gerrit.cloudera.org:8080/15793
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                         | 11 ++++--
 .../java/org/apache/impala/service/Frontend.java   |  7 ----
 .../org/apache/impala/service/JniFrontend.java     | 44 +++++++++++++++++++---
 tests/custom_cluster/test_coordinators.py          | 18 +++++++++
 4 files changed, 63 insertions(+), 17 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index a7bbaf7..c6e7844 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -284,6 +284,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   cluster_membership_mgr_.reset(new ClusterMembershipMgr(
       PrintId(backend_id_), statestore_subscriber_.get(), metrics_.get()));
 
+  // TODO: Consider removing AdmissionController from executor only impalads.
   admission_controller_.reset(
       new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
           request_pool_service_.get(), metrics_.get(), configured_backend_address_));
@@ -430,10 +431,12 @@ Status ExecEnv::Init() {
   }
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
-  cluster_membership_mgr_->RegisterUpdateCallbackFn(
-      [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
-        SendClusterMembershipToFrontend(snapshot, this->frontend());
-      });
+  if (FLAGS_is_coordinator) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          SendClusterMembershipToFrontend(snapshot, this->frontend());
+        });
+  }
 
   RETURN_IF_ERROR(admission_controller_->Init());
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fb7ca82..1715233 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -372,13 +372,6 @@ public class Frontend {
   }
 
   /**
-   * Update the cluster membership snapshot with the latest snapshot from the backend.
-   */
-  public void updateExecutorMembership(TUpdateExecutorMembershipRequest req) {
-    ExecutorMembershipSnapshot.update(req);
-  }
-
-  /**
    * Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the
    * result argument.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 2d47268..b572f8a 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -95,6 +95,7 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import org.apache.impala.util.AuthorizationUtil;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TSessionStateUtil;
@@ -134,11 +135,16 @@ public class JniFrontend {
 
     GlogAppender.Install(TLogLevel.values()[cfg.impala_log_lvl],
         TLogLevel.values()[cfg.non_impala_java_vlog]);
-
-    final AuthorizationFactory authzFactory =
-        AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
     LOG.info(JniUtil.getJavaVersion());
-    frontend_ = new Frontend(authzFactory);
+
+    if (cfg.is_coordinator) {
+      final AuthorizationFactory authzFactory =
+          AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
+      frontend_ = new Frontend(authzFactory);
+    } else {
+      // Avoid instantiating Frontend in executor only impalads.
+      frontend_ = null;
+    }
   }
 
   /**
@@ -147,6 +153,7 @@ public class JniFrontend {
    */
   public byte[] createExecRequest(byte[] thriftQueryContext)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TQueryCtx queryCtx = new TQueryCtx();
     JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
 
@@ -168,6 +175,7 @@ public class JniFrontend {
 
   // Deserialize and merge each thrift catalog update into a single merged update
   public byte[] updateCatalogCache(byte[] req) throws ImpalaException, TException {
+    Preconditions.checkNotNull(frontend_);
     TUpdateCatalogCacheRequest request = new TUpdateCatalogCacheRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, req);
     return new TSerializer(protocolFactory_).serialize(
@@ -182,7 +190,7 @@ public class JniFrontend {
       throws ImpalaException {
     TUpdateExecutorMembershipRequest req = new TUpdateExecutorMembershipRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftMembershipUpdate);
-    frontend_.updateExecutorMembership(req);
+    ExecutorMembershipSnapshot.update(req);
   }
 
   /**
@@ -193,6 +201,7 @@ public class JniFrontend {
    */
   public byte[] loadTableData(byte[] thriftLoadTableDataParams)
       throws ImpalaException, IOException {
+    Preconditions.checkNotNull(frontend_);
     TLoadDataReq request = new TLoadDataReq();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams);
     TLoadDataResp response = frontend_.loadTableData(request);
@@ -209,6 +218,7 @@ public class JniFrontend {
    * This call is thread-safe.
    */
   public String getExplainPlan(byte[] thriftQueryContext) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TQueryCtx queryCtx = new TQueryCtx();
     JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
     String plan = frontend_.getExplainString(queryCtx);
@@ -217,6 +227,7 @@ public class JniFrontend {
   }
 
   public byte[] getCatalogMetrics() throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetCatalogMetricsResult metrics = frontend_.getCatalogMetrics();
     TSerializer serializer = new TSerializer(protocolFactory_);
     try {
@@ -240,6 +251,7 @@ public class JniFrontend {
    * @see Frontend#getTableNames
    */
   public byte[] getTableNames(byte[] thriftGetTablesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetTablesParams params = new TGetTablesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
     // If the session was not set it indicates this is an internal Impala call.
@@ -269,6 +281,7 @@ public class JniFrontend {
    * @see Frontend#getTableFiles
    */
   public byte[] getTableFiles(byte[] thriftShowFilesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowFilesParams params = new TShowFilesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams);
     TResultSet result = frontend_.getTableFiles(params);
@@ -291,6 +304,7 @@ public class JniFrontend {
    * @see Frontend#getDbs
    */
   public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetDbsParams params = new TGetDbsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
     // If the session was not set it indicates this is an internal Impala call.
@@ -318,6 +332,7 @@ public class JniFrontend {
    * @see Frontend#getDataSrcs
    */
   public byte[] getDataSrcMetadata(byte[] thriftParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetDataSrcsParams params = new TGetDataSrcsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
 
@@ -342,6 +357,7 @@ public class JniFrontend {
   }
 
   public byte[] getStats(byte[] thriftShowStatsParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowStatsParams params = new TShowStatsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowStatsParams);
     Preconditions.checkState(params.isSetTable_name());
@@ -369,6 +385,7 @@ public class JniFrontend {
    * @see Frontend#getTableNames
    */
   public byte[] getFunctions(byte[] thriftGetFunctionsParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetFunctionsParams params = new TGetFunctionsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetFunctionsParams);
 
@@ -402,6 +419,7 @@ public class JniFrontend {
    */
   public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
       TException {
+    Preconditions.checkNotNull(frontend_);
     TCatalogObject objectDescription = new TCatalogObject();
     JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -416,6 +434,7 @@ public class JniFrontend {
    * @see Frontend#describeDb
    */
   public byte[] describeDb(byte[] thriftDescribeDbParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TDescribeDbParams params = new TDescribeDbParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeDbParams);
 
@@ -437,6 +456,7 @@ public class JniFrontend {
    * @see Frontend#describeTable
    */
   public byte[] describeTable(byte[] thriftDescribeTableParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TDescribeTableParams params = new TDescribeTableParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeTableParams);
 
@@ -464,6 +484,7 @@ public class JniFrontend {
    */
   public String showCreateTable(byte[] thriftTableName)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TTableName params = new TTableName();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftTableName);
     return ToSqlUtils.getCreateTableSql(frontend_.getCatalog().getTable(
@@ -475,6 +496,7 @@ public class JniFrontend {
    */
   public String showCreateFunction(byte[] thriftShowCreateFunctionParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetFunctionsParams params = new TGetFunctionsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowCreateFunctionParams);
     Preconditions.checkArgument(params.category == TFunctionCategory.SCALAR ||
@@ -506,6 +528,7 @@ public class JniFrontend {
    * Gets all roles.
    */
   public byte[] getRoles(byte[] showRolesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowRolesParams params = new TShowRolesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -521,6 +544,7 @@ public class JniFrontend {
    */
   public byte[] getPrincipalPrivileges(byte[] showGrantPrincipalParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowGrantPrincipalParams params = new TShowGrantPrincipalParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showGrantPrincipalParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -536,6 +560,7 @@ public class JniFrontend {
    */
   public byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TMetadataOpRequest params = new TMetadataOpRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams);
     TResultSet result = frontend_.execHiveServer2MetadataOp(params);
@@ -549,10 +574,14 @@ public class JniFrontend {
   }
 
   public void setCatalogIsReady() {
+    Preconditions.checkNotNull(frontend_);
     frontend_.getCatalog().setIsReady(true);
   }
 
-  public void waitForCatalog() { frontend_.waitForCatalog(); }
+  public void waitForCatalog() {
+    Preconditions.checkNotNull(frontend_);
+    frontend_.waitForCatalog();
+  }
 
   // Caching this saves ~50ms per call to getHadoopConfigAsHtml
   private static final Configuration CONF = new Configuration();
@@ -629,6 +658,7 @@ public class JniFrontend {
    * @param serializedRequest
    */
   public void callQueryCompleteHooks(byte[] serializedRequest) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     final TQueryCompleteContext request = new TQueryCompleteContext();
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
 
@@ -643,6 +673,7 @@ public class JniFrontend {
    * @throws TransactionException
    */
   public void abortTransaction(long transactionId) throws TransactionException {
+    Preconditions.checkNotNull(frontend_);
     this.frontend_.abortTransaction(transactionId);
   }
 
@@ -651,6 +682,7 @@ public class JniFrontend {
    * @param transactionId the id of the transaction to clear.
    */
   public void unregisterTransaction(long transactionId) {
+    Preconditions.checkNotNull(frontend_);
     this.frontend_.unregisterTransaction(transactionId);
   }
 
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 613fcf9..b63b1f7 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -24,6 +24,8 @@ import time
 from subprocess import check_call
 from tests.util.filesystem_utils import get_fs_path
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import (SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS,
+    SkipIfIsilon, SkipIfLocal)
 
 LOG = logging.getLogger('test_coordinators')
 LOG.setLevel(level=logging.DEBUG)
@@ -310,3 +312,19 @@ class TestCoordinators(CustomClusterTestSuite):
                          "functional.alltypes b on a.id = b.id;")
     num_hosts = "hosts=10 instances=10"
     assert num_hosts in str(ret)
+
+  @SkipIfS3.hbase
+  @SkipIfABFS.hbase
+  @SkipIfADLS.hbase
+  @SkipIfIsilon.hbase
+  @SkipIfLocal.hbase
+  @SkipIf.skip_hbase
+  @pytest.mark.execute_serially
+  def test_executor_only_hbase(self):
+    """Verifies HBase tables can be scanned by executor only impalads."""
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+                         use_exclusive_coordinators=True)
+    client = self.cluster.impalads[0].service.create_beeswax_client()
+    query = "select count(*) from functional_hbase.alltypes"
+    result = self.execute_query_expect_success(client, query)
+    assert result.data == ['7300']