You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morrySnow (via GitHub)" <gi...@apache.org> on 2023/04/10 05:34:01 UTC

[GitHub] [doris] morrySnow commented on a diff in pull request #18502: [ehancement](stats) Stats preheating as FE booted

morrySnow commented on code in PR #18502:
URL: https://github.com/apache/doris/pull/18502#discussion_r1161416835


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java:
##########
@@ -17,13 +17,11 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
-import com.google.common.base.Preconditions;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;

Review Comment:
   should we change this to `org.apache.logging.log4j.Logger;` like what `AnalysisManager`



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java:
##########
@@ -154,7 +154,8 @@ public static ColumnStatistic fromResultRow(ResultRow resultRow) {
             }
             columnStatisticBuilder.setSelectivity(1.0);
             columnStatisticBuilder.setOriginalNdv(ndv);
-            Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName);
+            Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName)

Review Comment:
   maybe the better way is, one class use for caching column stats, one class use for caching histogram, and a new class use for stats derive, it contains both column stats and histogram



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java:
##########
@@ -44,85 +47,155 @@ public class StatisticsCache {
             = ThreadPoolManager.newDaemonFixedThreadPool(
             10, Integer.MAX_VALUE, "STATS_FETCH", true);
 
-    private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader();
-
-    private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
-            .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
-            .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
-            .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
-            .executor(threadPool)
-            .buildAsync(cacheLoader);
+    private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader();
+    private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(columnStatisticsCacheLoader);
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<Histogram>> histogramCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(histogramCacheLoader);
 
     {
         threadPool.submit(() -> {
             while (true) {
                 try {
-                    cacheLoader.removeExpiredInProgressing();
-                    Thread.sleep(TimeUnit.MINUTES.toMillis(15));
+                    columnStatisticsCacheLoader.removeExpiredInProgressing();
+                    histogramCacheLoader.removeExpiredInProgressing();
                 } catch (Throwable t) {
                     // IGNORE
                 }
+                Thread.sleep(TimeUnit.MINUTES.toMillis(15));
             }
 
         });
     }
 
     public ColumnStatistic getColumnStatistics(long tblId, String colName) {
-        ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName);
-        if (columnLevelStatisticCache == null) {
-            return ColumnStatistic.UNKNOWN;
-        }
-        return columnLevelStatisticCache.columnStatistic;
+        return getColumnStatistics(tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
     }
 
-    public ColumnLevelStatisticCache getColumnStatistics(long tblId, long idxId, String colName) {
+    public Optional<ColumnStatistic> getColumnStatistics(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
             return null;
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
             if (f.isDone() && f.get() != null) {
                 return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning ColumnStatistic", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     public Histogram getHistogram(long tblId, String colName) {
-        return getHistogram(tblId, -1, colName);
+        return getHistogram(tblId, -1, colName).orElse(null);
     }
 
-    public Histogram getHistogram(long tblId, long idxId, String colName) {
+    public Optional<Histogram> getHistogram(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
-            return null;
+            return Optional.empty();
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<Histogram>> f = histogramCache.get(k);
             if (f.isDone() && f.get() != null) {
-                return f.get().getHistogram();
+                return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning Histogram", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     // TODO: finish this method.
     public void eraseExpiredCache(long tblId, long idxId, String colName) {
-        cache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
+        columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
     }
 
-    public void updateCache(long tblId, long idxId, String colName, ColumnLevelStatisticCache statistic) {
-        cache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), statistic);
+    public void updateCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
+        columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
     }
 
     public void refreshSync(long tblId, long idxId, String colName) {

Review Comment:
   histogram update and refresh should not call these functions



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java:
##########
@@ -44,85 +47,159 @@ public class StatisticsCache {
             = ThreadPoolManager.newDaemonFixedThreadPool(
             10, Integer.MAX_VALUE, "STATS_FETCH", true);
 
-    private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader();
-
-    private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
-            .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
-            .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
-            .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
-            .executor(threadPool)
-            .buildAsync(cacheLoader);
+    private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader();
+    private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(columnStatisticsCacheLoader);
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<Histogram>> histogramCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(histogramCacheLoader);
 
     {
         threadPool.submit(() -> {
             while (true) {
                 try {
-                    cacheLoader.removeExpiredInProgressing();
-                    Thread.sleep(TimeUnit.MINUTES.toMillis(15));
+                    columnStatisticsCacheLoader.removeExpiredInProgressing();
+                    histogramCacheLoader.removeExpiredInProgressing();
                 } catch (Throwable t) {
                     // IGNORE
                 }
+                Thread.sleep(TimeUnit.MINUTES.toMillis(15));
             }
 
         });
     }
 
     public ColumnStatistic getColumnStatistics(long tblId, String colName) {
-        ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName);
-        if (columnLevelStatisticCache == null) {
-            return ColumnStatistic.UNKNOWN;
-        }
-        return columnLevelStatisticCache.columnStatistic;
+        return getColumnStatistics(tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
     }
 
-    public ColumnLevelStatisticCache getColumnStatistics(long tblId, long idxId, String colName) {
+    public Optional<ColumnStatistic> getColumnStatistics(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
             return null;
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
             if (f.isDone() && f.get() != null) {
                 return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning ColumnStatistic", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     public Histogram getHistogram(long tblId, String colName) {
-        return getHistogram(tblId, -1, colName);
+        return getHistogram(tblId, -1, colName).orElse(null);
     }
 
-    public Histogram getHistogram(long tblId, long idxId, String colName) {
+    public Optional<Histogram> getHistogram(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
-            return null;
+            return Optional.empty();
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<Histogram>> f = histogramCache.get(k);
             if (f.isDone() && f.get() != null) {
-                return f.get().getHistogram();
+                return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning Histogram", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     // TODO: finish this method.
     public void eraseExpiredCache(long tblId, long idxId, String colName) {
-        cache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
+        columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
     }
 
-    public void updateCache(long tblId, long idxId, String colName, ColumnLevelStatisticCache statistic) {
-        cache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), statistic);
+    public void updateCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {

Review Comment:
   ```suggestion
       public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java:
##########
@@ -48,18 +46,19 @@ public class AnalysisTaskScheduler {
     private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
 
     public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) {
-        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
-        Preconditions.checkArgument(catalog != null);
-        DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
-        Preconditions.checkArgument(db != null);
-        TableIf table = db.getTableNullable(analysisJobInfo.tblName);
-        Preconditions.checkArgument(table != null);
-        BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo);
-        addToManualJobQueue(analysisTask);
-        if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
-            return;
+        try {
+            TableIf table = StatisticsUtil.findTable(analysisJobInfo.catalogName,
+                    analysisJobInfo.dbName, analysisJobInfo.tblName);
+            BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo);
+            addToManualJobQueue(analysisTask);
+            if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
+                return;
+            }
+            addToSystemQueue(analysisTask);

Review Comment:
   why add System Type Job to manual queue



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java:
##########
@@ -104,7 +115,16 @@ public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTas
         init(info);
     }
 
+    protected void initUnsupportedType() {
+        unsupportedType.add(Type.HLL);
+        unsupportedType.add(Type.BITMAP);
+        unsupportedType.add(Type.ARRAY);
+        unsupportedType.add(Type.MAP);

Review Comment:
   i think u should use `primitive type` to do this, because Complex Type's equals function compare their items.



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/stats/TPCHStats.java:
##########


Review Comment:
   why remove this test?



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java:
##########
@@ -44,85 +47,155 @@ public class StatisticsCache {
             = ThreadPoolManager.newDaemonFixedThreadPool(
             10, Integer.MAX_VALUE, "STATS_FETCH", true);
 
-    private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader();
-
-    private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
-            .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
-            .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
-            .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
-            .executor(threadPool)
-            .buildAsync(cacheLoader);
+    private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader();
+    private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(columnStatisticsCacheLoader);
+
+    private final AsyncLoadingCache<StatisticsCacheKey, Optional<Histogram>> histogramCache =
+            Caffeine.newBuilder()
+                    .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
+                    .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+                    .executor(threadPool)
+                    .buildAsync(histogramCacheLoader);
 
     {
         threadPool.submit(() -> {
             while (true) {
                 try {
-                    cacheLoader.removeExpiredInProgressing();
-                    Thread.sleep(TimeUnit.MINUTES.toMillis(15));
+                    columnStatisticsCacheLoader.removeExpiredInProgressing();
+                    histogramCacheLoader.removeExpiredInProgressing();
                 } catch (Throwable t) {
                     // IGNORE
                 }
+                Thread.sleep(TimeUnit.MINUTES.toMillis(15));
             }
 
         });
     }
 
     public ColumnStatistic getColumnStatistics(long tblId, String colName) {
-        ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName);
-        if (columnLevelStatisticCache == null) {
-            return ColumnStatistic.UNKNOWN;
-        }
-        return columnLevelStatisticCache.columnStatistic;
+        return getColumnStatistics(tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
     }
 
-    public ColumnLevelStatisticCache getColumnStatistics(long tblId, long idxId, String colName) {
+    public Optional<ColumnStatistic> getColumnStatistics(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
             return null;
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
             if (f.isDone() && f.get() != null) {
                 return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning ColumnStatistic", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     public Histogram getHistogram(long tblId, String colName) {
-        return getHistogram(tblId, -1, colName);
+        return getHistogram(tblId, -1, colName).orElse(null);
     }
 
-    public Histogram getHistogram(long tblId, long idxId, String colName) {
+    public Optional<Histogram> getHistogram(long tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
-            return null;
+            return Optional.empty();
         }
         StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
         try {
-            CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
+            CompletableFuture<Optional<Histogram>> f = histogramCache.get(k);
             if (f.isDone() && f.get() != null) {
-                return f.get().getHistogram();
+                return f.get();
             }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning Histogram", e);
         }
-        return null;
+        return Optional.empty();
     }
 
     // TODO: finish this method.
     public void eraseExpiredCache(long tblId, long idxId, String colName) {
-        cache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
+        columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
     }
 
-    public void updateCache(long tblId, long idxId, String colName, ColumnLevelStatisticCache statistic) {
-        cache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), statistic);
+    public void updateCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
+        columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
     }
 
     public void refreshSync(long tblId, long idxId, String colName) {
-        cache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
+        columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
     }
+
+    public void preHeat() {
+        threadPool.submit(this::doPreHeat);
+    }
+
+    private void doPreHeat() {

Review Comment:
   does preheat load histogram?



##########
fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java:
##########
@@ -40,7 +40,7 @@
 public class CacheTest extends TestWithFeService {
 
     @Test
-    public void testColumn(@Mocked StatisticsCacheLoader cacheLoader) throws Exception {
+    public void testColumn(@Mocked ColumnStatisticsCacheLoader cacheLoader) throws Exception {

Review Comment:
   does need test histogram cache?



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJobTest.java:
##########
@@ -70,7 +70,7 @@ public void testExecute() throws Exception {
         }
         Statistics statistics = cascadesContext.getMemo().getRoot().getStatistics();
         Assertions.assertNotNull(statistics);
-        Assertions.assertTrue(Precision.equals(0.5, statistics.getRowCount(), 0.1));
+        Assertions.assertTrue(Precision.equals(0, statistics.getRowCount(), 0.1));

Review Comment:
   why this test changed?



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java:
##########
@@ -104,7 +115,16 @@ public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTas
         init(info);
     }
 
+    protected void initUnsupportedType() {
+        unsupportedType.add(Type.HLL);
+        unsupportedType.add(Type.BITMAP);
+        unsupportedType.add(Type.ARRAY);
+        unsupportedType.add(Type.MAP);
+        unsupportedType.add(Type.JSONB);

Review Comment:
   support struct type?



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java:
##########
@@ -47,25 +44,24 @@ public class AnalysisTaskScheduler {
 
     private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
 
-    public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) {
-        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
-        Preconditions.checkArgument(catalog != null);
-        DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
-        Preconditions.checkArgument(db != null);
-        TableIf table = db.getTableNullable(analysisJobInfo.tblName);
-        Preconditions.checkArgument(table != null);
-        BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo);
-        addToManualJobQueue(analysisTask);
-        if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
-            return;
-        }
-        addToSystemQueue(analysisTask);
-    }
-
-    private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) {
-        if (manualJobSet.contains(analysisJobInfo)) {
-            systemJobQueue.remove(analysisJobInfo);
-            manualJobSet.remove(analysisJobInfo);
+    public synchronized void schedule(AnalysisTaskInfo analysisTaskInfo) {
+        try {
+            TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName,
+                    analysisTaskInfo.dbName, analysisTaskInfo.tblName);
+            BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisTaskInfo);
+            switch (analysisTaskInfo.jobType) {
+                case MANUAL:
+                    addToManualJobQueue(analysisTask);

Review Comment:
   add unsafe perfix to all unsafe function, such as rename `addToManualJobQueue ` to `unsafeAddToManualJobQueue`. Or add some comment to all unsafe function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org