You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/06 08:49:28 UTC

[GitHub] [incubator-doris] weizhengte opened a new pull request, #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

weizhengte opened a new pull request, #8859:
URL: https://github.com/apache/incubator-doris/pull/8859

   # Proposed changes
   
   This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job.
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854838261


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854838195


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {

Review Comment:
   ok



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856874191


##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1629,6 +1629,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_max_statistics_job_num = 20;
+    /*
+     * the max unfinished statistics task number
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int cbo_max_statistics_task_num = 512;

Review Comment:
   This may further control the number of statistics tasks, mainly for some tables with many columns, which divide too many statistics tasks, which may affect the performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r843929072


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,217 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                pendingJob.setScheduleTime(System.currentTimeMillis());
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask){
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);
+                } catch (DdlException e) {
+                    pendingJob.setJobState(StatisticsJob.JobState.FAILED);
+                    LOG.info("Failed to schedule the statistical job. " + e.getMessage());
+                }
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
-        pendingJobQueue.add(statisticsJob);
+        this.pendingJobQueue.add(statisticsJob);
+    }
+
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private List<StatisticsTask> divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tableIds = statisticsJob.relatedTableId();
+        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tableId : tableIds) {
+            Table tbl = db.getTableOrDdlException(tableId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tableId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+            StatsGranularityDesc dataSizeGranularity = this.getTblStatsGranularityDesc(tableId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+                StatsGranularityDesc rowCountGranularity = this.getTblStatsGranularityDesc(tableId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * 3700000000L) {

Review Comment:
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856874191


##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1629,6 +1629,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_max_statistics_job_num = 20;
+    /*
+     * the max unfinished statistics task number
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int cbo_max_statistics_task_num = 512;

Review Comment:
   This further controls the number of statistical tasks, mainly for some tables with many columns. Too many statistical tasks are divided, which may affect performance. this problem can be avoided by this configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854838032


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +47,211 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.poll();
+        if (pendingJob != null) {
+            try {
+                List<StatisticsTask> tasks = this.divide(pendingJob);

Review Comment:
   This task list is the same as pendingJob's task list, it get from pendingJob.getTasks().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r843930682


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");
         }
     }
 
-    // Rule1: The same table cannot have two unfinished statistics jobs
-    // Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
-    // Rule3: The job for external table is not supported
-    private void checkRestrict(Set<Long> tableIdList) {
-        // TODO
-    }
+    /**
+     * The statistical job has the following restrict:
+     * - Rule1: The same table cannot have two unfinished statistics jobs
+     * - Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
+     * - Rule3: The job for external table is not supported
+     */
+    private synchronized void checkRestrict(long dbId, Set<Long> tableIds) throws AnalysisException {
+        Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
 
-    private void checkPermission() {
-        // TODO
+        // check table type
+        for (Long tableId : tableIds) {
+            Table table = db.getTableOrAnalysisException(tableId);
+            if (table.getType() != Table.TableType.OLAP) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_OLAP_TABLE, db.getFullName(),table.getName(), "ANALYZE");
+            }
+        }
+
+        int unfinishedJobs = 0;
+
+        // check table unfinished job
+        for (Map.Entry<Long, StatisticsJob> jobEntry : this.idToStatisticsJob.entrySet()) {
+            StatisticsJob statisticsJob = jobEntry.getValue();
+            StatisticsJob.JobState jobState = statisticsJob.getJobState();
+            List<Long> tableIdList = statisticsJob.getTableIds();
+            if (jobState == StatisticsJob.JobState.PENDING
+                    || jobState == StatisticsJob.JobState.SCHEDULING
+                    || jobState == StatisticsJob.JobState.RUNNING) {
+                for (Long tableId : tableIds) {
+                    if (tableIdList.contains(tableId)) {
+                        throw new AnalysisException("The table(id=" + tableId + ") have two unfinished statistics jobs");

Review Comment:
   ok, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856874191


##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1629,6 +1629,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_max_statistics_job_num = 20;
+    /*
+     * the max unfinished statistics task number
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int cbo_max_statistics_task_num = 512;

Review Comment:
   This further controls the number of statistical tasks, mainly for some tables with many columns. Too many statistical tasks are divided, which may affect performance. this problem can be avoided by this configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r851042639


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask) {
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);

Review Comment:
   The action of modifying the state needs to be performed within the Statistics Job and the lock needs to be retained.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask) {
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);
+                    Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(list);
+                } catch (DdlException e) {
+                    pendingJob.setJobState(StatisticsJob.JobState.FAILED);

Review Comment:
   same as above ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#issuecomment-1109632931

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r852545050


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();

Review Comment:
   ok.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask) {
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);

Review Comment:
   ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r855932006


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {
+            if (jobState == JobState.SCHEDULING) {
+                this.jobState = JobState.SCHEDULING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // SCHEDULING -> RUNNING/FAILED/CANCELLED
+        if (this.jobState == JobState.SCHEDULING) {
+            if (jobState == JobState.RUNNING) {
+                this.jobState = JobState.RUNNING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED/CANCELLED
+        if (this.jobState == JobState.RUNNING) {
+            if (jobState == JobState.FINISHED) {
+                this.jobState = JobState.FINISHED;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+        }

Review Comment:
   Both success and failure should be logged. This is the Job critical log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854742859


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {

Review Comment:
   Also it should keep job lock when the state is switched .



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +47,211 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.poll();
+        if (pendingJob != null) {
+            try {
+                List<StatisticsTask> tasks = this.divide(pendingJob);

Review Comment:
   Put tasks into StatisticsJob at the same time



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED

Review Comment:
   If there is a invalid state switch, it will thrown exception 



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {
+            if (jobState == JobState.SCHEDULING) {
+                this.jobState = JobState.SCHEDULING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // SCHEDULING -> RUNNING/FAILED/CANCELLED
+        if (this.jobState == JobState.SCHEDULING) {
+            if (jobState == JobState.RUNNING) {
+                this.jobState = JobState.RUNNING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED/CANCELLED
+        if (this.jobState == JobState.RUNNING) {
+            if (jobState == JobState.FINISHED) {
+                this.jobState = JobState.FINISHED;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+        }

Review Comment:
   Add a LOG in here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] jackwener commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r857118324


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -18,46 +18,246 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.MasterDaemon;
 
 import com.google.common.collect.Queues;
 
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
-/*
-Schedule statistics job.
-  1. divide job to multi task
-  2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+  * Schedule statistics job.
+  *     1. divide job to multi task
+  *     2. submit all task to StatisticsTaskScheduler
+  * Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
         StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        if (pendingJob != null) {
+            try {
+                if (pendingJob.getTasks().size() == 0) {
+                    divide(pendingJob);
+                }
+                List<StatisticsTask> tasks = pendingJob.getTasks();
+                Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+                pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+                pendingJobQueue.remove();
+            } catch (IllegalStateException e) {
+                // throw IllegalStateException if the queue is full, re-add the tasks next time
+                LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId());
+            } catch (DdlException e) {
+                pendingJobQueue.remove();
+                pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
+                LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e);
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
         pendingJobQueue.add(statisticsJob);
     }
 
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private void divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tblIds = statisticsJob.getTblIds();
+        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tblId : tblIds) {
+            Table tbl = db.getTableOrDdlException(tblId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tblId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, tblId);
+            StatsGranularityDesc dataSizeGranularity = getTblStatsGranularityDesc(tblId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                    // divide subtasks by partition
+                    for (Long partitionId : partitionIds) {
+                        StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                        StatsGranularityDesc rowCountGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                                rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                        tasks.add(sqlTask);
+                    }
+                } else {
+                    StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                    StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                            rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                    tasks.add(sqlTask);
+                }
+            }
+
+            // step 3: generate [min,max,ndv] task
+            if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                for (String columnName : columnNameList) {

Review Comment:
   It's more clear to use `columnNameList.foreach( ..... )`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#issuecomment-1104673613

   The persistence logic is not yet complete.
   A separate PR is required to handle the persistence logic WIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r857134997


##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1629,6 +1629,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_max_statistics_job_num = 20;
+    /*
+     * the max unfinished statistics task number
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int cbo_max_statistics_task_num = 512;

Review Comment:
   Here is a simple explanation.  we think it is easier for users to understand the limit of the number of jobs. In fact, the number of tasks is more reasonable, but users may not know how to divide tasks. after discussion, we choose the former.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morrySnow commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
morrySnow commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r843736906


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,187 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.UserException;
 
+import com.clearspring.analytics.util.Lists;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.glassfish.jersey.internal.guava.Sets;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
 /*
 Used to store statistics job info,
 including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
-        CANCELLED
+        CANCELLED,
+        FAILED
     }
 
-    private long id = -1;
+    private final long id = Catalog.getCurrentCatalog().getNextId();

Review Comment:
   should we just assign id when the job could run?



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");
         }
     }
 
-    // Rule1: The same table cannot have two unfinished statistics jobs
-    // Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
-    // Rule3: The job for external table is not supported
-    private void checkRestrict(Set<Long> tableIdList) {
-        // TODO
-    }
+    /**
+     * The statistical job has the following restrict:
+     * - Rule1: The same table cannot have two unfinished statistics jobs
+     * - Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num
+     * - Rule3: The job for external table is not supported
+     */
+    private synchronized void checkRestrict(long dbId, Set<Long> tableIds) throws AnalysisException {
+        Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
 
-    private void checkPermission() {
-        // TODO
+        // check table type
+        for (Long tableId : tableIds) {
+            Table table = db.getTableOrAnalysisException(tableId);
+            if (table.getType() != Table.TableType.OLAP) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_OLAP_TABLE, db.getFullName(),table.getName(), "ANALYZE");
+            }
+        }
+
+        int unfinishedJobs = 0;
+
+        // check table unfinished job
+        for (Map.Entry<Long, StatisticsJob> jobEntry : this.idToStatisticsJob.entrySet()) {
+            StatisticsJob statisticsJob = jobEntry.getValue();
+            StatisticsJob.JobState jobState = statisticsJob.getJobState();
+            List<Long> tableIdList = statisticsJob.getTableIds();
+            if (jobState == StatisticsJob.JobState.PENDING
+                    || jobState == StatisticsJob.JobState.SCHEDULING
+                    || jobState == StatisticsJob.JobState.RUNNING) {
+                for (Long tableId : tableIds) {
+                    if (tableIdList.contains(tableId)) {
+                        throw new AnalysisException("The table(id=" + tableId + ") have two unfinished statistics jobs");

Review Comment:
   i think 'two unfinished job' lead to misunderstand here.
   remove two is better for me



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");

Review Comment:
   i think we also need to log this info for problem tracking



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,217 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                pendingJob.setScheduleTime(System.currentTimeMillis());
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask){

Review Comment:
   nit: blank after right parenthesis



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,217 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                pendingJob.setScheduleTime(System.currentTimeMillis());
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();
+                    for (StatisticsTask task : tasks) {
+                        // TODO now only support meta task
+                        if (task instanceof MetaStatisticsTask){
+                            list.add(task);
+                        }
+                    }
+                    pendingJob.setTasks(list);
+                    Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(list);
+                    pendingJob.setJobState(StatisticsJob.JobState.SCHEDULING);
+                } catch (DdlException e) {
+                    pendingJob.setJobState(StatisticsJob.JobState.FAILED);
+                    LOG.info("Failed to schedule the statistical job. " + e.getMessage());
+                }
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
-        pendingJobQueue.add(statisticsJob);
+        this.pendingJobQueue.add(statisticsJob);
+    }
+
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private List<StatisticsTask> divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tableIds = statisticsJob.relatedTableId();
+        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tableId : tableIds) {
+            Table tbl = db.getTableOrDdlException(tableId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tableId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+            StatsGranularityDesc dataSizeGranularity = this.getTblStatsGranularityDesc(tableId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = this.getTblStatsCategoryDesc(dbId, tableId);
+                StatsGranularityDesc rowCountGranularity = this.getTblStatsGranularityDesc(tableId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * 3700000000L) {

Review Comment:
   give names to magic numbers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 merged pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 merged PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r855989344


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,248 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private final long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    private final Map<String, String> properties;
+
+    /**
+     * to be executed tasks.
+     */
+    private final List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
+
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName,
+                         Map<String, String> properties) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+        this.properties = properties == null ? Maps.newHashMap() : properties;
+    }
+
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return this.errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void updateJobState(JobState newState) throws IllegalStateException {
+        writeLock();
+
+        try {
+            // PENDING -> SCHEDULING/FAILED/CANCELLED
+            if (jobState == JobState.PENDING) {
+                if (newState == JobState.SCHEDULING) {
+                    this.jobState = newState;
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = JobState.FAILED;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                } else {
+                    LOG.info("Invalid job state transition from PENDING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from PENDING to " + newState);
+                }
+                return;
+            }
+
+            // SCHEDULING -> RUNNING/FAILED/CANCELLED
+            if (jobState == JobState.SCHEDULING) {
+                if (newState == JobState.RUNNING) {
+                    this.jobState = newState;
+                    // job start running, set start time
+                    this.startTime = System.currentTimeMillis();
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                }  else {
+                    LOG.info("Invalid job state transition from SCHEDULING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState);
+                }
+                return;
+            }
+
+            // RUNNING -> FINISHED/FAILED/CANCELLED
+            if (jobState == JobState.RUNNING) {
+                if (newState == JobState.FINISHED) {
+                    // set finish time
+                    this.finishTime = System.currentTimeMillis();
+                    this.jobState = newState;
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                }  else {
+                    LOG.info("Invalid job state transition from RUNNING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState);
+                }
+                return;
+            }
+
+            // unsupported transition
+            LOG.info("Invalid job(id={}) state transition from {} to {} ", id, jobState, newState);
+            throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState);
+        } finally {
+            writeUnlock();
+        }

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856130084


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {
+            if (jobState == JobState.SCHEDULING) {
+                this.jobState = JobState.SCHEDULING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // SCHEDULING -> RUNNING/FAILED/CANCELLED
+        if (this.jobState == JobState.SCHEDULING) {
+            if (jobState == JobState.RUNNING) {
+                this.jobState = JobState.RUNNING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED/CANCELLED
+        if (this.jobState == JobState.RUNNING) {
+            if (jobState == JobState.FINISHED) {
+                this.jobState = JobState.FINISHED;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+        }

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856284415


##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1629,6 +1629,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_max_statistics_job_num = 20;
+    /*
+     * the max unfinished statistics task number
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int cbo_max_statistics_task_num = 512;

Review Comment:
   Do we still need to control the number of tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r856107078


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {
+            if (jobState == JobState.SCHEDULING) {
+                this.jobState = JobState.SCHEDULING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // SCHEDULING -> RUNNING/FAILED/CANCELLED
+        if (this.jobState == JobState.SCHEDULING) {
+            if (jobState == JobState.RUNNING) {
+                this.jobState = JobState.RUNNING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED/CANCELLED
+        if (this.jobState == JobState.RUNNING) {
+            if (jobState == JobState.FINISHED) {
+                this.jobState = JobState.FINISHED;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+        }

Review Comment:
   This willprint log in the method of job status update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r850339377


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();

Review Comment:
   Use poll without remove



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();
+            StatisticsJob.JobState jobState = pendingJob.getJobState();
+            // job scheduler is only responsible for moving the job from pending -> scheduler
+            if (jobState == StatisticsJob.JobState.PENDING) {
+                try {
+                    List<StatisticsTask> tasks = this.divide(pendingJob);
+                    ArrayList<StatisticsTask> list = Lists.newArrayList();

Review Comment:
   There is no need to write this kind of temporary code, anyway, the scheduler will not actually run.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +49,224 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.peek();
+        if (pendingJob != null) {
+            this.pendingJobQueue.remove();

Review Comment:
   delete it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854889779


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED

Review Comment:
   It may be better to log instead of exceptions. For example, if multiple tasks fail, then there may be job status updates from FAILED to FAILED. This is actually a normal behavior (we don’t need to do anything), adding Exceptions may make the code complicated~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r843929670


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -35,39 +44,74 @@
 public class StatisticsJobManager {
     private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
+
         // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        this.checkRestrict(statisticsJob.getDbId(), statisticsJob.relatedTableId());
+
+        // step3: create it
+        this.createStatisticsJob(statisticsJob);
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
-            LOG.info("The pending statistics job is full. Please submit it again later.");
+            throw new DdlException("The pending statistics job is full, Please submit it again later.");

Review Comment:
   ok~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r844705817


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,187 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.UserException;
 
+import com.clearspring.analytics.util.Lists;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.glassfish.jersey.internal.guava.Sets;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
 /*
 Used to store statistics job info,
 including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
-        CANCELLED
+        CANCELLED,
+        FAILED
     }
 
-    private long id = -1;
+    private final long id = Catalog.getCurrentCatalog().getNextId();

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r854889779


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED

Review Comment:
   ok~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#issuecomment-1109632994

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizhengte commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
weizhengte commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r857140673


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -18,46 +18,246 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.MasterDaemon;
 
 import com.google.common.collect.Queues;
 
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
-/*
-Schedule statistics job.
-  1. divide job to multi task
-  2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+  * Schedule statistics job.
+  *     1. divide job to multi task
+  *     2. submit all task to StatisticsTaskScheduler
+  * Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by users.
+     * if all statistics be collected at the same time, the cluster may be overburdened
+     * and normal query services may be affected. Therefore, we put the jobs into the queue
+     * and schedule them one by one, and finally divide each job to several subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
         StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        if (pendingJob != null) {
+            try {
+                if (pendingJob.getTasks().size() == 0) {
+                    divide(pendingJob);
+                }
+                List<StatisticsTask> tasks = pendingJob.getTasks();
+                Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+                pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+                pendingJobQueue.remove();
+            } catch (IllegalStateException e) {
+                // throw IllegalStateException if the queue is full, re-add the tasks next time
+                LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId());
+            } catch (DdlException e) {
+                pendingJobQueue.remove();
+                pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
+                LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e);
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
         pendingJobQueue.add(statisticsJob);
     }
 
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans for a single BE,
+     * we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private void divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tblIds = statisticsJob.getTblIds();
+        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tblId : tblIds) {
+            Table tbl = db.getTableOrDdlException(tblId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tblId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, tblId);
+            StatsGranularityDesc dataSizeGranularity = getTblStatsGranularityDesc(tblId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                    // divide subtasks by partition
+                    for (Long partitionId : partitionIds) {
+                        StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                        StatsGranularityDesc rowCountGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                                rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                        tasks.add(sqlTask);
+                    }
+                } else {
+                    StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
+                    StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                            rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
+                    tasks.add(sqlTask);
+                }
+            }
+
+            // step 3: generate [min,max,ndv] task
+            if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                for (String columnName : columnNameList) {

Review Comment:
   That's true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #8859: [feature-wip](statistics) step2: schedule the statistics job and generate executable tasks

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r855962371


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,248 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private final long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    private final Map<String, String> properties;
+
+    /**
+     * to be executed tasks.
+     */
+    private final List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
+
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName,
+                         Map<String, String> properties) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+        this.properties = properties == null ? Maps.newHashMap() : properties;
+    }
+
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return this.errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void updateJobState(JobState newState) throws IllegalStateException {
+        writeLock();
+
+        try {
+            // PENDING -> SCHEDULING/FAILED/CANCELLED
+            if (jobState == JobState.PENDING) {
+                if (newState == JobState.SCHEDULING) {
+                    this.jobState = newState;
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = JobState.FAILED;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                } else {
+                    LOG.info("Invalid job state transition from PENDING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from PENDING to " + newState);
+                }
+                return;
+            }
+
+            // SCHEDULING -> RUNNING/FAILED/CANCELLED
+            if (jobState == JobState.SCHEDULING) {
+                if (newState == JobState.RUNNING) {
+                    this.jobState = newState;
+                    // job start running, set start time
+                    this.startTime = System.currentTimeMillis();
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                }  else {
+                    LOG.info("Invalid job state transition from SCHEDULING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState);
+                }
+                return;
+            }
+
+            // RUNNING -> FINISHED/FAILED/CANCELLED
+            if (jobState == JobState.RUNNING) {
+                if (newState == JobState.FINISHED) {
+                    // set finish time
+                    this.finishTime = System.currentTimeMillis();
+                    this.jobState = newState;
+                } else if (newState == JobState.FAILED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is failed", id);
+                } else if (newState == JobState.CANCELLED) {
+                    this.jobState = newState;
+                    LOG.info("Statistics job(id={}) is cancelled.", id);
+                }  else {
+                    LOG.info("Invalid job state transition from RUNNING to " + newState);
+                    throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState);
+                }
+                return;
+            }
+
+            // unsupported transition
+            LOG.info("Invalid job(id={}) state transition from {} to {} ", id, jobState, newState);
+            throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState);
+        } finally {
+            writeUnlock();
+        }

Review Comment:
   Print a log of normal state transitions after the lock is released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org