You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/15 02:48:30 UTC

[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r850339377


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();

Review Comment:
   Use poll without remove



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();

Review Comment:
   There is no need to write this kind of temporary code, anyway, the scheduler will not actually run.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();

Review Comment:
   delete it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org