You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/12/16 15:05:28 UTC

[impala] 01/04: IMPALA-11054: Support resource pool polling for frontend

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b0c37b66004090e59a895849feabe2bb35097d91
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Dec 8 14:24:39 2021 -0800

    IMPALA-11054: Support resource pool polling for frontend
    
    This patch splits the Java class RequestPoolService as two classes -
    JniRequestPoolService and RequestPoolService, makes RequestPoolService
    as singleton class and provides an API for frontend to access
    RequestPoolService instance.
    
    Testing:
      - Manually verified that Planner could access RequestPoolService
        instance with RequestPoolService.getInstance().
      - Passed exhaustive tests.
    
    Change-Id: Ia78b1a0574f6b8ad4df5bb0fc9533f218b486e6b
    Reviewed-on: http://gerrit.cloudera.org:8080/18078
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/request-pool-service.cc          |  36 +++----
 be/src/scheduling/request-pool-service.h           |   4 +-
 .../apache/impala/util/JniRequestPoolService.java  | 112 +++++++++++++++++++++
 .../org/apache/impala/util/RequestPoolService.java |  91 +++++++++--------
 .../apache/impala/util/TestRequestPoolService.java |   4 +-
 5 files changed, 180 insertions(+), 67 deletions(-)

diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index cd78793..90e48d3 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -31,6 +31,7 @@
 #include "util/collection-metrics.h"
 #include "util/mem-info.h"
 #include "util/parse-util.h"
+#include "util/test-info.h"
 #include "util/time.h"
 
 #include "common/names.h"
@@ -115,21 +116,21 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
   }
   default_pool_only_ = false;
 
-  jmethodID start_id; // RequestPoolService.start(), only called in this method.
+  jmethodID start_id; // JniRequestPoolService.start(), only called in this method.
   JniMethodDescriptor methods[] = {
-    {"<init>", "(Ljava/lang/String;Ljava/lang/String;)V", &ctor_},
-    {"start", "()V", &start_id},
-    {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
-    {"getPoolConfig", "([B)[B", &get_pool_config_id_}};
+      {"<init>", "(Ljava/lang/String;Ljava/lang/String;Z)V", &ctor_},
+      {"start", "()V", &start_id},
+      {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
+      {"getPoolConfig", "([B)[B", &get_pool_config_id_}};
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
-  request_pool_service_class_ =
-    jni_env->FindClass("org/apache/impala/util/RequestPoolService");
+  jni_request_pool_service_class_ =
+      jni_env->FindClass("org/apache/impala/util/JniRequestPoolService");
   ABORT_IF_EXC(jni_env);
   uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
   for (int i = 0; i < num_methods; ++i) {
-    ABORT_IF_ERROR(JniUtil::LoadJniMethod(jni_env, request_pool_service_class_,
-        &(methods[i])));
+    ABORT_IF_ERROR(
+        JniUtil::LoadJniMethod(jni_env, jni_request_pool_service_class_, &(methods[i])));
   }
 
   jstring fair_scheduler_config_path =
@@ -139,12 +140,13 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
       jni_env->NewStringUTF(FLAGS_llama_site_path.c_str());
   ABORT_IF_EXC(jni_env);
 
-  jobject request_pool_service = jni_env->NewObject(request_pool_service_class_, ctor_,
-      fair_scheduler_config_path, llama_site_path);
+  jboolean is_be_test = TestInfo::is_be_test();
+  jobject jni_request_pool_service = jni_env->NewObject(jni_request_pool_service_class_,
+      ctor_, fair_scheduler_config_path, llama_site_path, is_be_test);
   ABORT_IF_EXC(jni_env);
-  ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, request_pool_service,
-      &request_pool_service_));
-  jni_env->CallObjectMethod(request_pool_service_, start_id);
+  ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(
+      jni_env, jni_request_pool_service, &jni_request_pool_service_));
+  jni_env->CallObjectMethod(jni_request_pool_service_, start_id);
   ABORT_IF_EXC(jni_env);
 }
 
@@ -168,8 +170,8 @@ Status RequestPoolService::ResolveRequestPool(const TQueryCtx& ctx,
   params.__set_requested_pool(requested_pool);
   TResolveRequestPoolResult result;
   int64_t start_time = MonotonicMillis();
-  Status status = JniUtil::CallJniMethod(request_pool_service_, resolve_request_pool_id_,
-      params, &result);
+  Status status = JniUtil::CallJniMethod(
+      jni_request_pool_service_, resolve_request_pool_id_, params, &result);
   resolve_pool_ms_metric_->Update(MonotonicMillis() - start_time);
 
   if (result.status.status_code != TErrorCode::OK) {
@@ -205,7 +207,7 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
   TPoolConfigParams params;
   params.__set_pool(pool_name);
   RETURN_IF_ERROR(JniUtil::CallJniMethod(
-        request_pool_service_, get_pool_config_id_, params, pool_config));
+      jni_request_pool_service_, get_pool_config_id_, params, pool_config));
   if (FLAGS_disable_pool_max_requests) pool_config->__set_max_requests(-1);
   if (FLAGS_disable_pool_mem_limits) pool_config->__set_max_mem_resources(-1);
   return Status::OK();
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index ad38900..5fa0979 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -69,9 +69,9 @@ class RequestPoolService {
 
   /// The following members are not initialized if default_pool_only_ is true.
   /// Descriptor of Java RequestPoolService class itself, used to create a new instance.
-  jclass request_pool_service_class_;
+  jclass jni_request_pool_service_class_;
   /// Instance of org.apache.impala.util.RequestPoolService
-  jobject request_pool_service_;
+  jobject jni_request_pool_service_;
   jmethodID resolve_request_pool_id_;  // RequestPoolService.resolveRequestPool()
   jmethodID get_pool_config_id_;  // RequestPoolService.getPoolConfig()
   jmethodID ctor_;
diff --git a/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java b/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java
new file mode 100644
index 0000000..8a5c3cf
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TPoolConfigParams;
+import org.apache.impala.thrift.TPoolConfig;
+import org.apache.impala.thrift.TResolveRequestPoolParams;
+import org.apache.impala.thrift.TResolveRequestPoolResult;
+import org.apache.impala.thrift.TStatus;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI interface for RequestPoolService.
+ */
+public class JniRequestPoolService {
+  final static Logger LOG = LoggerFactory.getLogger(JniRequestPoolService.class);
+
+  private final static TBinaryProtocol.Factory protocolFactory_ =
+      new TBinaryProtocol.Factory();
+
+  // A single instance is created by the backend and lasts the duration of the process.
+  private final RequestPoolService requestPoolService_;
+
+  /**
+   * Creates a RequestPoolService instance with a configuration containing the specified
+   * fair-scheduler.xml and llama-site.xml.
+   *
+   * @param fsAllocationPath path to the fair scheduler allocation file.
+   * @param sitePath path to the configuration file.
+   */
+  JniRequestPoolService(
+      final String fsAllocationPath, final String sitePath, boolean isBackendTest) {
+    Preconditions.checkNotNull(fsAllocationPath);
+    requestPoolService_ =
+        RequestPoolService.getInstance(fsAllocationPath, sitePath, isBackendTest);
+  }
+
+  /**
+   * Starts the RequestPoolService instance. It does the initial loading of the
+   * configuration and starts the automatic reloading.
+   */
+  @SuppressWarnings("unused") // called from C++
+  public void start() {
+    requestPoolService_.start();
+  }
+
+  /**
+   * Resolves a user and pool to the pool specified by the allocation placement policy
+   * and checks if the user is authorized to submit requests.
+   *
+   * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams}
+   * @return serialized {@link TResolveRequestPoolResult}
+   */
+  @SuppressWarnings("unused") // called from C++
+  public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
+      throws ImpalaException {
+    TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
+    JniUtil.deserializeThrift(
+        protocolFactory_, resolvePoolParams, thriftResolvePoolParams);
+    TResolveRequestPoolResult result =
+        requestPoolService_.resolveRequestPool(resolvePoolParams);
+    try {
+      return new TSerializer(protocolFactory_).serialize(result);
+    } catch (TException e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+
+  /**
+   * Gets the pool configuration values for the specified pool.
+   *
+   * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
+   * @return serialized {@link TPoolConfig}
+   */
+  @SuppressWarnings("unused") // called from C++
+  public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
+    TPoolConfigParams poolConfigParams = new TPoolConfigParams();
+    JniUtil.deserializeThrift(protocolFactory_, poolConfigParams, thriftPoolConfigParams);
+    TPoolConfig result = requestPoolService_.getPoolConfig(poolConfigParams.getPool());
+    try {
+      return new TSerializer(protocolFactory_).serialize(result);
+    } catch (TException e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index 58fe6d5..828fe37 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,8 +77,6 @@ import com.google.common.collect.Lists;
 public class RequestPoolService {
   final static Logger LOG = LoggerFactory.getLogger(RequestPoolService.class);
 
-  private final static TBinaryProtocol.Factory protocolFactory_ =
-      new TBinaryProtocol.Factory();
   // Used to ensure start() has been called before any other methods can be used.
   private final AtomicBoolean running_;
 
@@ -152,6 +147,9 @@ public class RequestPoolService {
   // URL of the configuration file.
   private final URL confUrl_;
 
+  // Reference of single instance of RequestPoolService.
+  private static RequestPoolService single_instance_ = null;
+
   /**
    * Updates the configuration when the file changes. The file is confUrl_
    * and it will exist when this is created (or RequestPoolService will not start). If
@@ -170,13 +168,42 @@ public class RequestPoolService {
   }
 
   /**
+   * Static method to create singleton instance of RequestPoolService class.
+   * This API is called by backend code through JNI, or called by unit-test code.
+   */
+  public static RequestPoolService getInstance(
+      final String fsAllocationPath, final String sitePath, boolean isTest) {
+    // For frontend and backend tests, different request pools could be created with
+    // different configurations in one process so we have to allow multiple instances
+    // to be created for frontend and backend tests.
+    if (isTest) return (new RequestPoolService(fsAllocationPath, sitePath));
+
+    if (single_instance_ == null) {
+      single_instance_ = new RequestPoolService(fsAllocationPath, sitePath);
+    }
+    return single_instance_;
+  }
+
+  /**
+   * Static method to return singleton instance of RequestPoolService class.
+   * This API is called by frontend Java code. An instance should be already created
+   * by backend before this API is called except only default pool is used.
+   */
+  public static RequestPoolService getInstance() {
+    if (single_instance_ == null) {
+      LOG.info("Default pool only, scheduler allocation is not specified.");
+    }
+    return single_instance_;
+  }
+
+  /**
    * Creates a RequestPoolService instance with a configuration containing the specified
    * fair-scheduler.xml and llama-site.xml.
    *
    * @param fsAllocationPath path to the fair scheduler allocation file.
    * @param sitePath path to the configuration file.
    */
-  RequestPoolService(final String fsAllocationPath, final String sitePath) {
+  private RequestPoolService(final String fsAllocationPath, final String sitePath) {
     Preconditions.checkNotNull(fsAllocationPath);
     running_ = new AtomicBoolean(false);
     allocationConf_ = new AtomicReference<>();
@@ -274,31 +301,12 @@ public class RequestPoolService {
    * Resolves a user and pool to the pool specified by the allocation placement policy
    * and checks if the user is authorized to submit requests.
    *
-   * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams}
-   * @return serialized {@link TResolveRequestPoolResult}
+   * @param resolvePoolParams {@link TResolveRequestPoolParams}
+   * @return {@link TResolveRequestPoolResult}
    */
-  @SuppressWarnings("unused") // called from C++
-  public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
-      throws ImpalaException {
-    TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
-    JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams,
-        thriftResolvePoolParams);
-    TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
-          resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
-          result.resolved_pool, result.has_access);
-    }
-    try {
-      return new TSerializer(protocolFactory_).serialize(result);
-    } catch (TException e) {
-      throw new InternalException(e.getMessage());
-    }
-  }
-
-  @VisibleForTesting
-  TResolveRequestPoolResult resolveRequestPool(
+  public TResolveRequestPoolResult resolveRequestPool(
       TResolveRequestPoolParams resolvePoolParams) throws InternalException {
+    Preconditions.checkState(running_.get());
     String requestedPool = resolvePoolParams.getRequested_pool();
     String user = resolvePoolParams.getUser();
     TResolveRequestPoolResult result = new TResolveRequestPoolResult();
@@ -335,31 +343,22 @@ public class RequestPoolService {
       result.setHas_access(hasAccess(pool, user));
       result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
+          resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
+          result.resolved_pool, result.has_access);
+    }
     return result;
   }
 
   /**
    * Gets the pool configuration values for the specified pool.
    *
-   * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
-   * @return serialized {@link TPoolConfig}
+   * @param pool name.
+   * @return {@link TPoolConfig}
    */
-  @SuppressWarnings("unused") // called from C++
-  public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
+  public TPoolConfig getPoolConfig(String pool) {
     Preconditions.checkState(running_.get());
-    TPoolConfigParams poolConfigParams = new TPoolConfigParams();
-    JniUtil.deserializeThrift(protocolFactory_, poolConfigParams,
-        thriftPoolConfigParams);
-    TPoolConfig result = getPoolConfig(poolConfigParams.getPool());
-    try {
-      return new TSerializer(protocolFactory_).serialize(result);
-    } catch (TException e) {
-      throw new InternalException(e.getMessage());
-    }
-  }
-
-  @VisibleForTesting
-  TPoolConfig getPoolConfig(String pool) {
     TPoolConfig result = new TPoolConfig();
     long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
     result.setMax_mem_resources(
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index 436984c..efc9412 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -105,8 +105,8 @@ public class TestRequestPoolService {
       Files.copy(getClasspathFile(llamaConfFile), llamaConfFile_);
       llamaConfPath = llamaConfFile_.getAbsolutePath();
     }
-    poolService_ = new RequestPoolService(allocationConfFile_.getAbsolutePath(),
-        llamaConfPath);
+    poolService_ = RequestPoolService.getInstance(
+        allocationConfFile_.getAbsolutePath(), llamaConfPath, /* isTest */ true);
 
     // Lower the wait times on the AllocationFileLoaderService and RequestPoolService so
     // the test doesn't have to wait very long to test that file changes are reloaded.