You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/24 00:19:27 UTC

[pinot] branch master updated: Fix QuerySchedulerFactory to plug in custom scheduler (#7945)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 428e3f2  Fix QuerySchedulerFactory to plug in custom scheduler (#7945)
428e3f2 is described below

commit 428e3f231a48151edb86ca4c9a4326aecacf5e00
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Dec 23 16:19:08 2021 -0800

    Fix QuerySchedulerFactory to plug in custom scheduler (#7945)
---
 .../query/scheduler/QuerySchedulerFactory.java     | 46 +++++++------
 .../query/scheduler/fcfs/BoundedFCFSScheduler.java |  2 +-
 .../query/scheduler/fcfs/FCFSQueryScheduler.java   |  2 +-
 .../query/scheduler/QuerySchedulerFactoryTest.java | 80 ++++++++++++++++++++++
 4 files changed, 108 insertions(+), 22 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index e111c05..86c7170 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -22,13 +22,13 @@ import com.google.common.base.Preconditions;
 import java.lang.reflect.Constructor;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
-import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
 import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
 import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +41,12 @@ public class QuerySchedulerFactory {
   }
 
   private static final Logger LOGGER = LoggerFactory.getLogger(QuerySchedulerFactory.class);
-  private static final String FCFS_ALGORITHM = "fcfs";
-  private static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM;
+
+  public static final String FCFS_ALGORITHM = "fcfs";
   public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket";
   public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs";
   public static final String ALGORITHM_NAME_CONFIG_KEY = "name";
+  public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM;
 
   /**
    * Static factory to instantiate query scheduler based on scheduler configuration.
@@ -60,21 +61,26 @@ public class QuerySchedulerFactory {
     Preconditions.checkNotNull(schedulerConfig);
     Preconditions.checkNotNull(queryExecutor);
 
-    String schedulerName =
-        schedulerConfig.getProperty(ALGORITHM_NAME_CONFIG_KEY, DEFAULT_QUERY_SCHEDULER_ALGORITHM).toLowerCase();
-    if (schedulerName.equals(FCFS_ALGORITHM)) {
-      LOGGER.info("Using FCFS query scheduler");
-      return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
-    } else if (schedulerName.equals(TOKEN_BUCKET_ALGORITHM)) {
-      LOGGER.info("Using Priority Token Bucket scheduler");
-      return TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
-    } else if (schedulerName.equals(BOUNDED_FCFS_ALGORITHM)) {
-      return BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+    String schedulerName = schedulerConfig.getProperty(ALGORITHM_NAME_CONFIG_KEY, DEFAULT_QUERY_SCHEDULER_ALGORITHM);
+    QueryScheduler scheduler;
+    switch (schedulerName.toLowerCase()) {
+      case FCFS_ALGORITHM:
+        scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+        break;
+      case TOKEN_BUCKET_ALGORITHM:
+        scheduler = TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+        break;
+      case BOUNDED_FCFS_ALGORITHM:
+        scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+        break;
+      default:
+        scheduler =
+            getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
+        break;
     }
 
-    // didn't find by name so try by classname
-    QueryScheduler scheduler = getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor);
     if (scheduler != null) {
+      LOGGER.info("Using {} scheduler", scheduler.name());
       return scheduler;
     }
 
@@ -82,19 +88,19 @@ public class QuerySchedulerFactory {
     // because it's better to execute with poor algorithm than completely fail.
     // Failure on bad configuration will cause outage vs an inferior algorithm that
     // will provide degraded service
-
     LOGGER.warn("Scheduler {} not found. Using default FCFS query scheduler", schedulerName);
     return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
   }
 
   @Nullable
   private static QueryScheduler getQuerySchedulerByClassName(String className, PinotConfiguration schedulerConfig,
-      QueryExecutor queryExecutor) {
+      QueryExecutor queryExecutor, ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
     try {
-      Constructor<?> constructor =
-          Class.forName(className).getDeclaredConstructor(Configuration.class, QueryExecutor.class);
+      Constructor<?> constructor = PluginManager.get().loadClass(className)
+          .getDeclaredConstructor(PinotConfiguration.class, QueryExecutor.class, ServerMetrics.class,
+              LongAccumulator.class);
       constructor.setAccessible(true);
-      return (QueryScheduler) constructor.newInstance(schedulerConfig, queryExecutor);
+      return (QueryScheduler) constructor.newInstance(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
     } catch (Exception e) {
       LOGGER.error("Failed to instantiate scheduler class by name: {}", className, e);
       return null;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index e7d76de..60495ad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -58,6 +58,6 @@ public class BoundedFCFSScheduler extends PriorityScheduler {
 
   @Override
   public String name() {
-    return "BoundedFCFSScheduler";
+    return "BoundedFCFS";
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index 0f128e3..3e0d096 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -68,6 +68,6 @@ public class FCFSQueryScheduler extends QueryScheduler {
 
   @Override
   public String name() {
-    return "fcfs";
+    return "FCFS";
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
new file mode 100644
index 0000000..a495c6c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
+import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
+import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
+import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
+
+
+public class QuerySchedulerFactoryTest {
+
+  @Test
+  public void testQuerySchedulerFactory() {
+    QueryExecutor queryExecutor = mock(QueryExecutor.class);
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    LongAccumulator latestQueryTime = mock(LongAccumulator.class);
+
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.FCFS_ALGORITHM);
+    QueryScheduler queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+    assertTrue(queryScheduler instanceof FCFSQueryScheduler);
+
+    config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.TOKEN_BUCKET_ALGORITHM);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+    assertTrue(queryScheduler instanceof TokenPriorityScheduler);
+
+    config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.BOUNDED_FCFS_ALGORITHM);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+    assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
+
+    config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, TestQueryScheduler.class.getName());
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
+    assertTrue(queryScheduler instanceof TestQueryScheduler);
+  }
+
+  public static final class TestQueryScheduler extends QueryScheduler {
+
+    public TestQueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics serverMetrics,
+        LongAccumulator latestQueryTime) {
+      super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
+    }
+
+    @Override
+    public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String name() {
+      return "Test";
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org