You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/12/16 15:05:28 UTC
[impala] 01/04: IMPALA-11054: Support resource pool polling for frontend
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b0c37b66004090e59a895849feabe2bb35097d91
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Dec 8 14:24:39 2021 -0800
IMPALA-11054: Support resource pool polling for frontend
This patch splits the Java class RequestPoolService as two classes -
JniRequestPoolService and RequestPoolService, makes RequestPoolService
as singleton class and provides an API for frontend to access
RequestPoolService instance.
Testing:
- Manually verified that Planner could access RequestPoolService
instance with RequestPoolService.getInstance().
- Passed exhaustive tests.
Change-Id: Ia78b1a0574f6b8ad4df5bb0fc9533f218b486e6b
Reviewed-on: http://gerrit.cloudera.org:8080/18078
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/scheduling/request-pool-service.cc | 36 +++----
be/src/scheduling/request-pool-service.h | 4 +-
.../apache/impala/util/JniRequestPoolService.java | 112 +++++++++++++++++++++
.../org/apache/impala/util/RequestPoolService.java | 91 +++++++++--------
.../apache/impala/util/TestRequestPoolService.java | 4 +-
5 files changed, 180 insertions(+), 67 deletions(-)
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index cd78793..90e48d3 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -31,6 +31,7 @@
#include "util/collection-metrics.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
+#include "util/test-info.h"
#include "util/time.h"
#include "common/names.h"
@@ -115,21 +116,21 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
}
default_pool_only_ = false;
- jmethodID start_id; // RequestPoolService.start(), only called in this method.
+ jmethodID start_id; // JniRequestPoolService.start(), only called in this method.
JniMethodDescriptor methods[] = {
- {"<init>", "(Ljava/lang/String;Ljava/lang/String;)V", &ctor_},
- {"start", "()V", &start_id},
- {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
- {"getPoolConfig", "([B)[B", &get_pool_config_id_}};
+ {"<init>", "(Ljava/lang/String;Ljava/lang/String;Z)V", &ctor_},
+ {"start", "()V", &start_id},
+ {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
+ {"getPoolConfig", "([B)[B", &get_pool_config_id_}};
JNIEnv* jni_env = JniUtil::GetJNIEnv();
- request_pool_service_class_ =
- jni_env->FindClass("org/apache/impala/util/RequestPoolService");
+ jni_request_pool_service_class_ =
+ jni_env->FindClass("org/apache/impala/util/JniRequestPoolService");
ABORT_IF_EXC(jni_env);
uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
for (int i = 0; i < num_methods; ++i) {
- ABORT_IF_ERROR(JniUtil::LoadJniMethod(jni_env, request_pool_service_class_,
- &(methods[i])));
+ ABORT_IF_ERROR(
+ JniUtil::LoadJniMethod(jni_env, jni_request_pool_service_class_, &(methods[i])));
}
jstring fair_scheduler_config_path =
@@ -139,12 +140,13 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
jni_env->NewStringUTF(FLAGS_llama_site_path.c_str());
ABORT_IF_EXC(jni_env);
- jobject request_pool_service = jni_env->NewObject(request_pool_service_class_, ctor_,
- fair_scheduler_config_path, llama_site_path);
+ jboolean is_be_test = TestInfo::is_be_test();
+ jobject jni_request_pool_service = jni_env->NewObject(jni_request_pool_service_class_,
+ ctor_, fair_scheduler_config_path, llama_site_path, is_be_test);
ABORT_IF_EXC(jni_env);
- ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, request_pool_service,
- &request_pool_service_));
- jni_env->CallObjectMethod(request_pool_service_, start_id);
+ ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(
+ jni_env, jni_request_pool_service, &jni_request_pool_service_));
+ jni_env->CallObjectMethod(jni_request_pool_service_, start_id);
ABORT_IF_EXC(jni_env);
}
@@ -168,8 +170,8 @@ Status RequestPoolService::ResolveRequestPool(const TQueryCtx& ctx,
params.__set_requested_pool(requested_pool);
TResolveRequestPoolResult result;
int64_t start_time = MonotonicMillis();
- Status status = JniUtil::CallJniMethod(request_pool_service_, resolve_request_pool_id_,
- params, &result);
+ Status status = JniUtil::CallJniMethod(
+ jni_request_pool_service_, resolve_request_pool_id_, params, &result);
resolve_pool_ms_metric_->Update(MonotonicMillis() - start_time);
if (result.status.status_code != TErrorCode::OK) {
@@ -205,7 +207,7 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
TPoolConfigParams params;
params.__set_pool(pool_name);
RETURN_IF_ERROR(JniUtil::CallJniMethod(
- request_pool_service_, get_pool_config_id_, params, pool_config));
+ jni_request_pool_service_, get_pool_config_id_, params, pool_config));
if (FLAGS_disable_pool_max_requests) pool_config->__set_max_requests(-1);
if (FLAGS_disable_pool_mem_limits) pool_config->__set_max_mem_resources(-1);
return Status::OK();
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index ad38900..5fa0979 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -69,9 +69,9 @@ class RequestPoolService {
/// The following members are not initialized if default_pool_only_ is true.
/// Descriptor of Java RequestPoolService class itself, used to create a new instance.
- jclass request_pool_service_class_;
+ jclass jni_request_pool_service_class_;
/// Instance of org.apache.impala.util.RequestPoolService
- jobject request_pool_service_;
+ jobject jni_request_pool_service_;
jmethodID resolve_request_pool_id_; // RequestPoolService.resolveRequestPool()
jmethodID get_pool_config_id_; // RequestPoolService.getPoolConfig()
jmethodID ctor_;
diff --git a/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java b/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java
new file mode 100644
index 0000000..8a5c3cf
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/JniRequestPoolService.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TPoolConfigParams;
+import org.apache.impala.thrift.TPoolConfig;
+import org.apache.impala.thrift.TResolveRequestPoolParams;
+import org.apache.impala.thrift.TResolveRequestPoolResult;
+import org.apache.impala.thrift.TStatus;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI interface for RequestPoolService.
+ */
+public class JniRequestPoolService {
+ final static Logger LOG = LoggerFactory.getLogger(JniRequestPoolService.class);
+
+ private final static TBinaryProtocol.Factory protocolFactory_ =
+ new TBinaryProtocol.Factory();
+
+ // A single instance is created by the backend and lasts the duration of the process.
+ private final RequestPoolService requestPoolService_;
+
+ /**
+ * Creates a RequestPoolService instance with a configuration containing the specified
+ * fair-scheduler.xml and llama-site.xml.
+ *
+ * @param fsAllocationPath path to the fair scheduler allocation file.
+ * @param sitePath path to the configuration file.
+ */
+ JniRequestPoolService(
+ final String fsAllocationPath, final String sitePath, boolean isBackendTest) {
+ Preconditions.checkNotNull(fsAllocationPath);
+ requestPoolService_ =
+ RequestPoolService.getInstance(fsAllocationPath, sitePath, isBackendTest);
+ }
+
+ /**
+ * Starts the RequestPoolService instance. It does the initial loading of the
+ * configuration and starts the automatic reloading.
+ */
+ @SuppressWarnings("unused") // called from C++
+ public void start() {
+ requestPoolService_.start();
+ }
+
+ /**
+ * Resolves a user and pool to the pool specified by the allocation placement policy
+ * and checks if the user is authorized to submit requests.
+ *
+ * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams}
+ * @return serialized {@link TResolveRequestPoolResult}
+ */
+ @SuppressWarnings("unused") // called from C++
+ public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
+ throws ImpalaException {
+ TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
+ JniUtil.deserializeThrift(
+ protocolFactory_, resolvePoolParams, thriftResolvePoolParams);
+ TResolveRequestPoolResult result =
+ requestPoolService_.resolveRequestPool(resolvePoolParams);
+ try {
+ return new TSerializer(protocolFactory_).serialize(result);
+ } catch (TException e) {
+ throw new InternalException(e.getMessage());
+ }
+ }
+
+ /**
+ * Gets the pool configuration values for the specified pool.
+ *
+ * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
+ * @return serialized {@link TPoolConfig}
+ */
+ @SuppressWarnings("unused") // called from C++
+ public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
+ TPoolConfigParams poolConfigParams = new TPoolConfigParams();
+ JniUtil.deserializeThrift(protocolFactory_, poolConfigParams, thriftPoolConfigParams);
+ TPoolConfig result = requestPoolService_.getPoolConfig(poolConfigParams.getPool());
+ try {
+ return new TSerializer(protocolFactory_).serialize(result);
+ } catch (TException e) {
+ throw new InternalException(e.getMessage());
+ }
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index 58fe6d5..828fe37 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,8 +77,6 @@ import com.google.common.collect.Lists;
public class RequestPoolService {
final static Logger LOG = LoggerFactory.getLogger(RequestPoolService.class);
- private final static TBinaryProtocol.Factory protocolFactory_ =
- new TBinaryProtocol.Factory();
// Used to ensure start() has been called before any other methods can be used.
private final AtomicBoolean running_;
@@ -152,6 +147,9 @@ public class RequestPoolService {
// URL of the configuration file.
private final URL confUrl_;
+ // Reference of single instance of RequestPoolService.
+ private static RequestPoolService single_instance_ = null;
+
/**
* Updates the configuration when the file changes. The file is confUrl_
* and it will exist when this is created (or RequestPoolService will not start). If
@@ -170,13 +168,42 @@ public class RequestPoolService {
}
/**
+ * Static method to create singleton instance of RequestPoolService class.
+ * This API is called by backend code through JNI, or called by unit-test code.
+ */
+ public static RequestPoolService getInstance(
+ final String fsAllocationPath, final String sitePath, boolean isTest) {
+ // For frontend and backend tests, different request pools could be created with
+ // different configurations in one process so we have to allow multiple instances
+ // to be created for frontend and backend tests.
+ if (isTest) return (new RequestPoolService(fsAllocationPath, sitePath));
+
+ if (single_instance_ == null) {
+ single_instance_ = new RequestPoolService(fsAllocationPath, sitePath);
+ }
+ return single_instance_;
+ }
+
+ /**
+ * Static method to return singleton instance of RequestPoolService class.
+ * This API is called by frontend Java code. An instance should be already created
+ * by backend before this API is called except only default pool is used.
+ */
+ public static RequestPoolService getInstance() {
+ if (single_instance_ == null) {
+ LOG.info("Default pool only, scheduler allocation is not specified.");
+ }
+ return single_instance_;
+ }
+
+ /**
* Creates a RequestPoolService instance with a configuration containing the specified
* fair-scheduler.xml and llama-site.xml.
*
* @param fsAllocationPath path to the fair scheduler allocation file.
* @param sitePath path to the configuration file.
*/
- RequestPoolService(final String fsAllocationPath, final String sitePath) {
+ private RequestPoolService(final String fsAllocationPath, final String sitePath) {
Preconditions.checkNotNull(fsAllocationPath);
running_ = new AtomicBoolean(false);
allocationConf_ = new AtomicReference<>();
@@ -274,31 +301,12 @@ public class RequestPoolService {
* Resolves a user and pool to the pool specified by the allocation placement policy
* and checks if the user is authorized to submit requests.
*
- * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams}
- * @return serialized {@link TResolveRequestPoolResult}
+ * @param resolvePoolParams {@link TResolveRequestPoolParams}
+ * @return {@link TResolveRequestPoolResult}
*/
- @SuppressWarnings("unused") // called from C++
- public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
- throws ImpalaException {
- TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
- JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams,
- thriftResolvePoolParams);
- TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams);
- if (LOG.isTraceEnabled()) {
- LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
- resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
- result.resolved_pool, result.has_access);
- }
- try {
- return new TSerializer(protocolFactory_).serialize(result);
- } catch (TException e) {
- throw new InternalException(e.getMessage());
- }
- }
-
- @VisibleForTesting
- TResolveRequestPoolResult resolveRequestPool(
+ public TResolveRequestPoolResult resolveRequestPool(
TResolveRequestPoolParams resolvePoolParams) throws InternalException {
+ Preconditions.checkState(running_.get());
String requestedPool = resolvePoolParams.getRequested_pool();
String user = resolvePoolParams.getUser();
TResolveRequestPoolResult result = new TResolveRequestPoolResult();
@@ -335,31 +343,22 @@ public class RequestPoolService {
result.setHas_access(hasAccess(pool, user));
result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
+ resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
+ result.resolved_pool, result.has_access);
+ }
return result;
}
/**
* Gets the pool configuration values for the specified pool.
*
- * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
- * @return serialized {@link TPoolConfig}
+ * @param pool name.
+ * @return {@link TPoolConfig}
*/
- @SuppressWarnings("unused") // called from C++
- public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
+ public TPoolConfig getPoolConfig(String pool) {
Preconditions.checkState(running_.get());
- TPoolConfigParams poolConfigParams = new TPoolConfigParams();
- JniUtil.deserializeThrift(protocolFactory_, poolConfigParams,
- thriftPoolConfigParams);
- TPoolConfig result = getPoolConfig(poolConfigParams.getPool());
- try {
- return new TSerializer(protocolFactory_).serialize(result);
- } catch (TException e) {
- throw new InternalException(e.getMessage());
- }
- }
-
- @VisibleForTesting
- TPoolConfig getPoolConfig(String pool) {
TPoolConfig result = new TPoolConfig();
long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
result.setMax_mem_resources(
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index 436984c..efc9412 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -105,8 +105,8 @@ public class TestRequestPoolService {
Files.copy(getClasspathFile(llamaConfFile), llamaConfFile_);
llamaConfPath = llamaConfFile_.getAbsolutePath();
}
- poolService_ = new RequestPoolService(allocationConfFile_.getAbsolutePath(),
- llamaConfPath);
+ poolService_ = RequestPoolService.getInstance(
+ allocationConfFile_.getAbsolutePath(), llamaConfPath, /* isTest */ true);
// Lower the wait times on the AllocationFileLoaderService and RequestPoolService so
// the test doesn't have to wait very long to test that file changes are reloaded.