You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/24 00:19:27 UTC
[pinot] branch master updated: Fix QuerySchedulerFactory to plug in custom scheduler (#7945)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 428e3f2 Fix QuerySchedulerFactory to plug in custom scheduler (#7945)
428e3f2 is described below
commit 428e3f231a48151edb86ca4c9a4326aecacf5e00
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Dec 23 16:19:08 2021 -0800
Fix QuerySchedulerFactory to plug in custom scheduler (#7945)
---
.../query/scheduler/QuerySchedulerFactory.java | 46 +++++++------
.../query/scheduler/fcfs/BoundedFCFSScheduler.java | 2 +-
.../query/scheduler/fcfs/FCFSQueryScheduler.java | 2 +-
.../query/scheduler/QuerySchedulerFactoryTest.java | 80 ++++++++++++++++++++++
4 files changed, 108 insertions(+), 22 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index e111c05..86c7170 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -22,13 +22,13 @@ import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
-import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +41,12 @@ public class QuerySchedulerFactory {
}
private static final Logger LOGGER = LoggerFactory.getLogger(QuerySchedulerFactory.class);
- private static final String FCFS_ALGORITHM = "fcfs";
- private static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM;
+
+ public static final String FCFS_ALGORITHM = "fcfs";
public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket";
public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs";
public static final String ALGORITHM_NAME_CONFIG_KEY = "name";
+ public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM;
/**
* Static factory to instantiate query scheduler based on scheduler configuration.
@@ -60,21 +61,26 @@ public class QuerySchedulerFactory {
Preconditions.checkNotNull(schedulerConfig);
Preconditions.checkNotNull(queryExecutor);
- String schedulerName =
- schedulerConfig.getProperty(ALGORITHM_NAME_CONFIG_KEY, DEFAULT_QUERY_SCHEDULER_ALGORITHM).toLowerCase();
- if (schedulerName.equals(FCFS_ALGORITHM)) {
- LOGGER.info("Using FCFS query scheduler");
- return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
- } else if (schedulerName.equals(TOKEN_BUCKET_ALGORITHM)) {
- LOGGER.info("Using Priority Token Bucket scheduler");
- return TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
- } else if (schedulerName.equals(BOUNDED_FCFS_ALGORITHM)) {
- return BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+ String schedulerName = schedulerConfig.getProperty(ALGORITHM_NAME_CONFIG_KEY, DEFAULT_QUERY_SCHEDULER_ALGORITHM);
+ QueryScheduler scheduler;
+ switch (schedulerName.toLowerCase()) {
+ case FCFS_ALGORITHM:
+ scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+ break;
+ case TOKEN_BUCKET_ALGORITHM:
+ scheduler = TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+ break;
+ case BOUNDED_FCFS_ALGORITHM:
+ scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+ break;
+ default:
+ scheduler =
+ getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+ break;
}
- // didn't find by name so try by classname
- QueryScheduler scheduler = getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor);
if (scheduler != null) {
+ LOGGER.info("Using {} scheduler", scheduler.name());
return scheduler;
}
@@ -82,19 +88,19 @@ public class QuerySchedulerFactory {
// because it's better to execute with poor algorithm than completely fail.
// Failure on bad configuration will cause outage vs an inferior algorithm that
// will provide degraded service
-
LOGGER.warn("Scheduler {} not found. Using default FCFS query scheduler", schedulerName);
return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
}
@Nullable
private static QueryScheduler getQuerySchedulerByClassName(String className, PinotConfiguration schedulerConfig,
- QueryExecutor queryExecutor) {
+ QueryExecutor queryExecutor, ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
try {
- Constructor<?> constructor =
- Class.forName(className).getDeclaredConstructor(Configuration.class, QueryExecutor.class);
+ Constructor<?> constructor = PluginManager.get().loadClass(className)
+ .getDeclaredConstructor(PinotConfiguration.class, QueryExecutor.class, ServerMetrics.class,
+ LongAccumulator.class);
constructor.setAccessible(true);
- return (QueryScheduler) constructor.newInstance(schedulerConfig, queryExecutor);
+ return (QueryScheduler) constructor.newInstance(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
} catch (Exception e) {
LOGGER.error("Failed to instantiate scheduler class by name: {}", className, e);
return null;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index e7d76de..60495ad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -58,6 +58,6 @@ public class BoundedFCFSScheduler extends PriorityScheduler {
@Override
public String name() {
- return "BoundedFCFSScheduler";
+ return "BoundedFCFS";
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index 0f128e3..3e0d096 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -68,6 +68,6 @@ public class FCFSQueryScheduler extends QueryScheduler {
@Override
public String name() {
- return "fcfs";
+ return "FCFS";
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
new file mode 100644
index 0000000..a495c6c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
+import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
+import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
+import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
+
+
+public class QuerySchedulerFactoryTest {
+
+ @Test
+ public void testQuerySchedulerFactory() {
+ QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ LongAccumulator latestQueryTime = mock(LongAccumulator.class);
+
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.FCFS_ALGORITHM);
+ QueryScheduler queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+ assertTrue(queryScheduler instanceof FCFSQueryScheduler);
+
+ config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.TOKEN_BUCKET_ALGORITHM);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+ assertTrue(queryScheduler instanceof TokenPriorityScheduler);
+
+ config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.BOUNDED_FCFS_ALGORITHM);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+ assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
+
+ config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, TestQueryScheduler.class.getName());
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+ assertTrue(queryScheduler instanceof TestQueryScheduler);
+ }
+
+ public static final class TestQueryScheduler extends QueryScheduler {
+
+ public TestQueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics serverMetrics,
+ LongAccumulator latestQueryTime) {
+ super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
+ }
+
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String name() {
+ return "Test";
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org