You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/06 10:33:33 UTC

[GitHub] [incubator-doris] morrySnow commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

morrySnow commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r843736906


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,187 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.UserException;
 
+import com.clearspring.analytics.util.Lists;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.glassfish.jersey.internal.guava.Sets;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
 /*
 Used to store statistics job info,
 including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
-        CANCELLED
+        CANCELLED,
+        FAILED
     }
 
-    private long id = -1;
+    private final long id = Catalog.getCurrentCatalog().getNextId();

Review Comment:
   should we just assign id when the job could run?



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");
         }
     }
 
-    // Rule1: The same table cannot have two unfinished statistics jobs
-    // Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
-    // Rule3: The job for external table is not supported
-    private void checkRestrict(Set<Long> tableIdList) {
-        // TODO
-    }
+    /**
+     * The statistical job has the following restrict:
+     * - Rule1: The same table cannot have two unfinished statistics jobs
+     * - Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
+     * - Rule3: The job for external table is not supported
+     */
+    private synchronized void checkRestrict(long dbId, Set<Long> tableIds) throws AnalysisException {
+        Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
 
-    private void checkPermission() {
-        // TODO
+        // check table type
+        for (Long tableId : tableIds) {
+            Table table = db.getTableOrAnalysisException(tableId);
+            if (table.getType() != Table.TableType.OLAP) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_OLAP_TABLE, db.getFullName(),table.getName(), "ANALYZE");
+            }
+        }
+
+        int unfinishedJobs = 0;
+
+        // check table unfinished job
+        for (Map.Entry<Long, StatisticsJob> jobEntry : this.idToStatisticsJob.entrySet()) {
+            StatisticsJob statisticsJob = jobEntry.getValue();
+            StatisticsJob.JobState jobState = statisticsJob.getJobState();
+            List<Long> tableIdList = statisticsJob.getTableIds();
+            if (jobState == StatisticsJob.JobState.PENDING
+                    || jobState == StatisticsJob.JobState.SCHEDULING
+                    || jobState == StatisticsJob.JobState.RUNNING) {
+                for (Long tableId : tableIds) {
+                    if (tableIdList.contains(tableId)) {
+                        throw new AnalysisException("The table(id=" + tableId + ") have two unfinished statistics jobs");

Review Comment:
   i think 'two unfinished job' lead to misunderstand here.
   remove two is better for me



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");

Review Comment:
   i think we also need to log this info for problem tracking



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,217 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                pendingJob.setScheduleTime(System.currentTimeMillis());
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask){

Review Comment:
   nit: blank after right parenthesis



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,217 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                pendingJob.setScheduleTime(System.currentTimeMillis());
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask){
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);
+                } catch (DdlException e) {
+                    pendingJob.setJobState(StatisticsJob.JobState.FAILED);
+                    LOG.info("Failed to schedule the statistical job. " + e.getMessage());
+                }
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
-        pendingJobQueue.add(statisticsJob);
+        this.pendingJobQueue.add(statisticsJob);
+    }
+
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private List<StatisticsTask> divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tableIds = statisticsJob.relatedTableId();
+        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tableId : tableIds) {
+            Table tbl = db.getTableOrDdlException(tableId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tableId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+            StatsGranularityDesc dataSizeGranularity = this.getTblStatsGranularityDesc(tableId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+                StatsGranularityDesc rowCountGranularity = this.getTblStatsGranularityDesc(tableId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * 3700000000L) {

Review Comment:
   give names to magic numbers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org