You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:20:59 UTC

[kylin] 07/22: KYLIN-5315 update AutoRefreshSnapshotScheduler afterPropertiesSet

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 275d57dea54a3752cf621e86b7660d4a7eddb13e
Author: jlf <lo...@kyligence.io>
AuthorDate: Fri Oct 14 10:01:24 2022 +0800

    KYLIN-5315 update AutoRefreshSnapshotScheduler afterPropertiesSet
---
 .../rest/scheduler/AutoRefreshSnapshotRunner.java  | 64 ++++++++-------------
 .../scheduler/AutoRefreshSnapshotScheduler.java    | 38 +++++++++++--
 ...pshotThread.java => BuildSnapshotRunnable.java} |  4 +-
 ...leThread.java => CheckSourceTableRunnable.java} |  2 +-
 .../scheduler/AutoRefreshSnapshotConfigTest.java   | 58 +++++++++++++++++++
 .../scheduler/AutoRefreshSnapshotRunnerTest.java   | 66 ++++++----------------
 ...eadTest.java => BuildSnapshotRunnableTest.java} | 20 +++----
 ...Test.java => CheckSourceTableRunnableTest.java} | 11 ++--
 .../SnapshotSourceTableStatsServiceTest.scala      |  7 +--
 .../service/SnapshotSourceTableStatsService.java   | 21 ++++---
 .../TestSnapshotSourceTableStatsService.java       | 36 ------------
 11 files changed, 166 insertions(+), 161 deletions(-)

diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
index c3e7ce5046..294b6ddb75 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
@@ -87,8 +87,6 @@ public class AutoRefreshSnapshotRunner implements Runnable {
     @Getter
     private Map<Future<String>, Long> checkSourceTableFutures = Maps.newConcurrentMap();
     @Getter
-    private Map<Future<String>, Long> buildSnapshotFutures = Maps.newConcurrentMap();
-    @Getter
     private final String project;
     @Setter
     @Getter
@@ -141,6 +139,8 @@ public class AutoRefreshSnapshotRunner implements Runnable {
                         poolExecutor.getPoolSize(), poolExecutor.getCorePoolSize(), poolExecutor.getActiveCount(),
                         poolExecutor.getMaximumPoolSize());
             }
+            projectConfig = NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).getProject(project)
+                    .getConfig();
             saveSnapshotViewMapping(project, restTemplate);
             val tables = SnapshotJobUtils.getSnapshotTables(projectConfig, project);
             val viewTableMapping = readViewTableMapping();
@@ -152,8 +152,6 @@ public class AutoRefreshSnapshotRunner implements Runnable {
 
             waitCheckSourceTableTaskDone();
 
-            waitBuildSnapshotTaskDone();
-
             log.info("Project[{}] stop check and refresh snapshot", project);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -163,9 +161,7 @@ public class AutoRefreshSnapshotRunner implements Runnable {
         } finally {
             checkSourceTableQueue = new LinkedBlockingQueue<>();
             cancelFuture(checkSourceTableFutures);
-            cancelFuture(buildSnapshotFutures);
             checkSourceTableFutures = Maps.newConcurrentMap();
-            buildSnapshotFutures = Maps.newConcurrentMap();
             sourceTableSnapshotMapping = Maps.newHashMap();
             buildSnapshotCount = Maps.newConcurrentMap();
         }
@@ -214,10 +210,12 @@ public class AutoRefreshSnapshotRunner implements Runnable {
             }
         }
         for (TableDesc tableDesc : tables) {
-            val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT);
-            val snapshots = result.getOrDefault(source, Lists.newArrayList());
-            snapshots.add(tableDesc);
-            result.put(source, snapshots.stream().distinct().collect(Collectors.toList()));
+            if (!tableDesc.isView()) {
+                val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT);
+                val snapshots = result.getOrDefault(source, Lists.newArrayList());
+                snapshots.add(tableDesc);
+                result.put(source, snapshots.stream().distinct().collect(Collectors.toList()));
+            }
         }
         return result;
     }
@@ -259,16 +257,16 @@ public class AutoRefreshSnapshotRunner implements Runnable {
 
     public void checkSourceTable(Set<String> allSourceTable) {
         for (String table : allSourceTable) {
-            val thread = new CheckSourceTableThread();
-            thread.setProject(project);
-            thread.setConfig(projectConfig);
-            thread.setTableIdentity(table);
-            thread.setRestTemplate(restTemplate);
-            thread.setCheckSourceTableQueue(checkSourceTableQueue);
+            val runnable = new CheckSourceTableRunnable();
+            runnable.setProject(project);
+            runnable.setConfig(projectConfig);
+            runnable.setTableIdentity(table);
+            runnable.setRestTemplate(restTemplate);
+            runnable.setCheckSourceTableQueue(checkSourceTableQueue);
             sourceTableSnapshotMapping.get(table).stream()
                     .filter(tableDesc -> StringUtils.equalsIgnoreCase(table, tableDesc.getIdentity())).findFirst()
-                    .ifPresent(tableDesc -> thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()));
-            val submit = jobPool.submit(thread, "success");
+                    .ifPresent(tableDesc -> runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()));
+            val submit = jobPool.submit(runnable, "success");
             checkSourceTableFutures.put(submit, System.currentTimeMillis());
         }
     }
@@ -304,33 +302,21 @@ public class AutoRefreshSnapshotRunner implements Runnable {
         }
     }
 
-    public void waitBuildSnapshotTaskDone() throws InterruptedException {
-        while (true) {
-            val doneCount = buildSnapshotFutures.keySet().stream().filter(Future::isDone).count();
-            if (buildSnapshotFutures.size() == doneCount) {
-                break;
-            }
-            cancelTimeoutFuture(buildSnapshotFutures);
-            TimeUnit.SECONDS.sleep(10);
-        }
-    }
-
     public void buildSnapshot(CheckSourceTableResult result) {
         val needBuildSnapshots = sourceTableSnapshotMapping.get(result.getTableIdentity());
         for (TableDesc tableDesc : needBuildSnapshots) {
             val sourceTableCount = buildSnapshotCount.getOrDefault(tableDesc.getIdentity(), new AtomicInteger(0));
             log.info("buildSnapshotCount is [{}], tableIdentity is [{}]", sourceTableCount, tableDesc.getIdentity());
             if (sourceTableCount.getAndIncrement() == 0) {
-                val thread = new BuildSnapshotThread();
-                thread.setProject(project);
-                thread.setConfig(projectConfig);
-                thread.setRestTemplate(restTemplate);
-                thread.setNeedRefresh(result.getNeedRefresh());
-                thread.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue());
-                thread.setTableIdentity(tableDesc.getIdentity());
-                thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
-                val submit = jobPool.submit(thread, "success");
-                buildSnapshotFutures.put(submit, System.currentTimeMillis());
+                val runnable = new BuildSnapshotRunnable();
+                runnable.setProject(project);
+                runnable.setConfig(projectConfig);
+                runnable.setRestTemplate(restTemplate);
+                runnable.setNeedRefresh(result.getNeedRefresh());
+                runnable.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue());
+                runnable.setTableIdentity(tableDesc.getIdentity());
+                runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
+                runnable.run();
             }
             buildSnapshotCount.put(tableDesc.getIdentity(), sourceTableCount);
         }
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
index 539bf6ac08..8639c09d30 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
@@ -26,17 +26,23 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.PostConstruct;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.scheduler.EpochStartedNotifier;
+import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.annotation.Order;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -46,7 +52,7 @@ import org.springframework.web.client.RestTemplate;
 
 import com.google.common.collect.Maps;
 
-import org.apache.kylin.metadata.epoch.EpochManager;
+import io.kyligence.kap.guava20.shaded.common.eventbus.Subscribe;
 import lombok.Getter;
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
@@ -66,7 +72,7 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Slf4j
 @Component
-public class AutoRefreshSnapshotScheduler implements InitializingBean {
+public class AutoRefreshSnapshotScheduler {
     private static final Integer THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE = 20;
     @Autowired
     @Qualifier("projectScheduler")
@@ -228,7 +234,6 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean {
         }
     }
 
-    @Override
     public void afterPropertiesSet() throws Exception {
         log.info("AutoRefreshSnapshotScheduler init...");
         val fs = HadoopUtil.getWorkingFileSystem();
@@ -257,3 +262,28 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean {
         }
     }
 }
+
+@Slf4j
+@Configuration
+@Order
+class AutoRefreshSnapshotConfig {
+    @Autowired
+    private AutoRefreshSnapshotScheduler scheduler;
+
+    @PostConstruct
+    public void init() {
+        val kylinConfig = KylinConfig.getInstanceFromEnv();
+        if (kylinConfig.isJobNode()) {
+            EventBusFactory.getInstance().register(this, false);
+        }
+    }
+
+    @Subscribe
+    public void registerScheduler(EpochStartedNotifier notifier) {
+        try {
+            scheduler.afterPropertiesSet();
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
similarity index 99%
rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java
rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
index 4759260d54..e9ebd6c67a 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
@@ -42,6 +42,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.response.RestResponse;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -53,7 +54,6 @@ import org.springframework.http.HttpMethod;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 
-import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
 import io.kyligence.kap.guava20.shaded.common.collect.Lists;
 import io.kyligence.kap.guava20.shaded.common.collect.Maps;
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -64,7 +64,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class BuildSnapshotThread extends AbstractSchedulerRunnable {
+public class BuildSnapshotRunnable extends AbstractSchedulerRunnable {
     private static final String BUILD_SNAPSHOT_ERROR_MESSAGE = "Project[%s] Snapshot[%s] buildSnapshot failed";
 
     @Override
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
similarity index 98%
rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java
rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
index d983fec99f..9fc0fd9d1d 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
@@ -46,7 +46,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class CheckSourceTableThread extends AbstractSchedulerRunnable {
+public class CheckSourceTableRunnable extends AbstractSchedulerRunnable {
 
     private static final String SNAPSHOT_TABLE_CHECK_ERROR_MESSAGE = "Project[%s] Snapshot source table[%s] check table stats Failed";
 
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java
new file mode 100644
index 0000000000..450f0bc088
--- /dev/null
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.scheduler;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.scheduler.EpochStartedNotifier;
+import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import lombok.val;
+
+class AutoRefreshSnapshotConfigTest {
+    @Test
+    void testRegisterScheduler() throws Exception {
+        registerScheduler(true);
+        registerScheduler(false);
+    }
+
+    void registerScheduler(Boolean isJobNode) throws Exception {
+        try (val mockStatic = Mockito.mockStatic(EventBusFactory.class);
+                val configStatic = Mockito.mockStatic(KylinConfig.class)) {
+            val config = Mockito.mock(KylinConfig.class);
+            Mockito.when(config.isJobNode()).thenReturn(isJobNode);
+            configStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(config);
+
+            val buildConfig = new AutoRefreshSnapshotConfig();
+            val eventBus = Mockito.mock(EventBusFactory.class);
+            mockStatic.when(EventBusFactory::getInstance).thenReturn(eventBus);
+            buildConfig.init();
+            val scheduler = Mockito.mock(AutoRefreshSnapshotScheduler.class);
+            ReflectionTestUtils.setField(buildConfig, "scheduler", scheduler);
+
+            Mockito.doNothing().when(scheduler).afterPropertiesSet();
+            buildConfig.registerScheduler(new EpochStartedNotifier());
+
+            Mockito.doThrow(new Exception("test")).when(scheduler).afterPropertiesSet();
+            buildConfig.registerScheduler(new EpochStartedNotifier());
+        }
+    }
+}
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
index ae62d5c7a4..5dd01bf60a 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -67,7 +66,6 @@ import io.kyligence.kap.guava20.shaded.common.collect.Lists;
 import io.kyligence.kap.guava20.shaded.common.collect.Maps;
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
 import lombok.val;
-import lombok.var;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -116,7 +114,6 @@ class AutoRefreshSnapshotRunnerTest {
             runner.doRun();
             assertTrue(CollectionUtils.isEmpty(runner.getCheckSourceTableQueue()));
             assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotCount()));
-            assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotFutures()));
             assertTrue(MapUtils.isEmpty(runner.getCheckSourceTableFutures()));
             assertTrue(MapUtils.isEmpty(runner.getSourceTableSnapshotMapping()));
         } finally {
@@ -247,6 +244,16 @@ class AutoRefreshSnapshotRunnerTest {
                 if (i < 14) {
                     tables.add(allTables.get(i));
                 }
+                if (allTables.get(i).isView()) {
+                    tables.add(allTables.get(i));
+                    val sourceTables = Sets.<String> newHashSet();
+                    for (int j = 0; j < 7; j++) {
+                        sourceTables.add("default.table_" + j);
+                        excepted.add("default.table_" + j);
+                    }
+                    sourceTables.add(allTables.get(i).getIdentity().toLowerCase(Locale.ROOT));
+                    viewTableMapping.put(allTables.get(i).getIdentity(), sourceTables);
+                }
                 if (i > 7) {
                     val sourceTables = Sets.<String> newHashSet();
                     for (int j = 0; j < 7; j++) {
@@ -307,9 +314,9 @@ class AutoRefreshSnapshotRunnerTest {
             val sourceTables = sourceTableSnapshotMapping.keySet();
             runner.getSourceTableSnapshotMapping().putAll(sourceTableSnapshotMapping);
 
-            try (val ignored = Mockito.mockConstruction(CheckSourceTableThread.class,
+            try (val ignored = Mockito.mockConstruction(CheckSourceTableRunnable.class,
                     (mock, context) -> Mockito.doNothing().when(mock).checkTable())) {
-                try (val ignored2 = Mockito.mockConstruction(BuildSnapshotThread.class,
+                try (val ignored2 = Mockito.mockConstruction(BuildSnapshotRunnable.class,
                         (mock, context) -> Mockito.doNothing().when(mock).buildSnapshot())) {
                     runner.checkSourceTable(sourceTables);
 
@@ -334,9 +341,6 @@ class AutoRefreshSnapshotRunnerTest {
                     exceptedTmp.addAll(tableDescs);
                     val excepted = exceptedTmp.stream().distinct().collect(Collectors.toList());
                     assertEquals(excepted.size(), buildSnapshotCount.size());
-
-                    val buildSnapshotFutures = runner.getBuildSnapshotFutures();
-                    assertEquals(excepted.size(), buildSnapshotFutures.size());
                 }
             }
         } finally {
@@ -344,34 +348,6 @@ class AutoRefreshSnapshotRunnerTest {
         }
     }
 
-    @Test
-    void waitBuildSnapshotTaskDone() {
-        val project = "default";
-        try {
-            val runner = AutoRefreshSnapshotRunner.getInstance(project);
-            val tasks = Lists.<Future<String>> newArrayList();
-            for (int i = 0; i < 5; i++) {
-                val futureTask = new FutureTask<String>(() -> null);
-                tasks.add(futureTask);
-                runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
-            }
-            val result = new AtomicBoolean(false);
-            val thread = new Thread(() -> {
-                try {
-                    runner.waitBuildSnapshotTaskDone();
-                    result.set(true);
-                } catch (InterruptedException e) {
-                    log.error(e.getMessage(), e);
-                }
-            });
-            thread.start();
-            tasks.forEach(task -> task.cancel(true));
-            await().atMost(new Duration(12, SECONDS)).untilAsserted(() -> assertTrue(result.get()));
-        } finally {
-            AutoRefreshSnapshotRunner.shutdown(project);
-        }
-    }
-
     @Test
     void cancelTimeoutFuture() {
         val project = RandomUtil.randomUUIDStr();
@@ -385,11 +361,11 @@ class AutoRefreshSnapshotRunnerTest {
             for (int i = 0; i < 5; i++) {
                 val futureTask = new FutureTask<String>(() -> null);
                 tasks.add(futureTask);
-                runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
+                runner.getCheckSourceTableFutures().put(futureTask, System.currentTimeMillis());
             }
             await().pollDelay(new Duration(2, SECONDS)).until(() -> true);
-            runner.cancelTimeoutFuture(runner.getBuildSnapshotFutures());
-            runner.getBuildSnapshotFutures().keySet().forEach(future -> {
+            runner.cancelTimeoutFuture(runner.getCheckSourceTableFutures());
+            runner.getCheckSourceTableFutures().keySet().forEach(future -> {
                 assertTrue(future.isCancelled());
                 assertTrue(future.isDone());
             });
@@ -457,16 +433,6 @@ class AutoRefreshSnapshotRunnerTest {
             val overrideProps = Maps.<String, String> newLinkedHashMap();
             projectManager.createProject(project, "test", "", overrideProps);
             val runner = AutoRefreshSnapshotRunner.getInstance(project);
-            for (int i = 0; i < 5; i++) {
-                val futureTask = new FutureTask<String>(() -> null);
-                runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
-                if (i % 2 == 0) {
-                    futureTask.cancel(true);
-                }
-            }
-            runner.cancelFuture(runner.getBuildSnapshotFutures());
-            var actual = runner.getBuildSnapshotFutures().keySet().stream().filter(Future::isDone).count();
-            assertEquals(runner.getBuildSnapshotFutures().size(), actual);
 
             for (int i = 0; i < 5; i++) {
                 val futureTask = new FutureTask<String>(() -> null);
@@ -476,7 +442,7 @@ class AutoRefreshSnapshotRunnerTest {
                 }
             }
             runner.cancelFuture(runner.getCheckSourceTableFutures());
-            actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count();
+            val actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count();
             assertEquals(runner.getCheckSourceTableFutures().size(), actual);
         } finally {
             AutoRefreshSnapshotRunner.shutdown(project);
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
similarity index 97%
rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java
rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
index 214ac7954d..b2d856887e 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
@@ -36,6 +36,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.response.RestResponse;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -51,7 +52,6 @@ import org.springframework.web.client.RestTemplate;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 
-import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
 import io.kyligence.kap.guava20.shaded.common.collect.Lists;
 import io.kyligence.kap.guava20.shaded.common.collect.Maps;
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -59,12 +59,12 @@ import lombok.val;
 import lombok.var;
 
 @MetadataInfo
-class BuildSnapshotThreadTest {
+class BuildSnapshotRunnableTest {
     private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
 
     @Test
     void buildSnapshot() throws JsonProcessingException {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setProject("project");
         thread.setConfig(KylinConfig.readSystemKylinConfig());
         thread.setRestTemplate(restTemplate);
@@ -112,7 +112,7 @@ class BuildSnapshotThreadTest {
 
     @Test
     void buildSnapshotFailed() throws JsonProcessingException {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setProject("project");
         thread.setConfig(KylinConfig.readSystemKylinConfig());
         thread.setRestTemplate(restTemplate);
@@ -149,7 +149,7 @@ class BuildSnapshotThreadTest {
 
     @Test
     void checkSnapshotJobFile() {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setConfig(KylinConfig.getInstanceFromEnv());
         thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
         val jobId = RandomUtil.randomUUIDStr();
@@ -182,7 +182,7 @@ class BuildSnapshotThreadTest {
     @Test
     void checkAutoRefreshJobSuccessOrRunning() {
         val jobId = RandomUtil.randomUUIDStr();
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setConfig(KylinConfig.getInstanceFromEnv());
         assertFalse(thread.checkAutoRefreshJobSuccessOrRunning(jobId));
 
@@ -205,7 +205,7 @@ class BuildSnapshotThreadTest {
 
     @Test
     void snapshotJobFile() {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setConfig(KylinConfig.getInstanceFromEnv());
         thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
         val jobId = RandomUtil.randomUUIDStr();
@@ -219,7 +219,7 @@ class BuildSnapshotThreadTest {
 
     @Test
     void snapshotJobFileNotExists() {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setConfig(KylinConfig.getInstanceFromEnv());
         thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
         val snapshotJob = thread.readSnapshotJobFile();
@@ -228,7 +228,7 @@ class BuildSnapshotThreadTest {
 
     @Test
     void checkNeedBuildPartitionAndSetTableOption() throws JsonProcessingException {
-        val thread = new BuildSnapshotThread();
+        val thread = new BuildSnapshotRunnable();
         thread.setTableIdentity("default.table");
         val req = Maps.newHashMap();
         val runningJobs = Lists.<NSparkSnapshotJob> newArrayList();
@@ -279,7 +279,7 @@ class BuildSnapshotThreadTest {
             Mockito.when(executableManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, SNAPSHOT_BUILD,
                     SNAPSHOT_REFRESH)).thenReturn(runningJobs);
 
-            val thread = new BuildSnapshotThread();
+            val thread = new BuildSnapshotRunnable();
             thread.setTableIdentity("default.table");
             thread.setProject("default");
             try {
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
similarity index 93%
rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java
rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
index 920e89bd76..db82b60d66 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
@@ -42,12 +42,12 @@ import io.kyligence.kap.guava20.shaded.common.collect.Sets;
 import lombok.val;
 
 @MetadataInfo
-class CheckSourceTableThreadTest {
+class CheckSourceTableRunnableTest {
     private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
 
     @Test
     void checkTable() throws JsonProcessingException {
-        val thread = new CheckSourceTableThread();
+        val thread = new CheckSourceTableRunnable();
         thread.setProject("project");
         thread.setConfig(KylinConfig.readSystemKylinConfig());
         thread.setTableIdentity("default.table");
@@ -72,7 +72,7 @@ class CheckSourceTableThreadTest {
     @Test
     void checkTableFailed() {
         try {
-            val thread = new CheckSourceTableThread();
+            val thread = new CheckSourceTableRunnable();
             thread.setProject("project");
             thread.setConfig(KylinConfig.readSystemKylinConfig());
             thread.setTableIdentity("default.table");
@@ -83,9 +83,8 @@ class CheckSourceTableThreadTest {
             thread.checkTable();
         } catch (Exception e) {
             assertTrue(e instanceof KylinRuntimeException);
-            assertEquals(
-                    "Project[project] Snapshot source table[default.table] check table stats Failed",
+            assertEquals("Project[project] Snapshot source table[default.table] check table stats Failed",
                     e.getMessage());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
index 635b6e54f7..6663a71041 100644
--- a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
+++ b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
@@ -194,9 +194,8 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local
         val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT)
         val locationPath = table.location.getPath
         val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath)
-        val snapshotTablesLocationsJson = Maps.newHashMap[String, SnapshotSourceTableStats]()
-        snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config,
-          locationFilesStatus, snapshotTablesLocationsJson)
+        val snapshotTablesLocationsJson = snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config,
+          locationFilesStatus)
         snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, tableIdentity, snapshotTablesLocationsJson)
 
         val fromJson = snapshotSourceTableStatsService.getSnapshotSourceTableStatsJsonFromHDFS(DEFAULT_PROJECT, tableIdentity).getSecond
@@ -522,4 +521,4 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local
       assertFalse(checkStatsFile)
     })
   }
-}
\ No newline at end of file
+}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
index 360ecad17f..8dcc09980d 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
@@ -141,6 +141,7 @@ public class SnapshotSourceTableStatsService extends BasicService {
         } catch (Exception e) {
             log.info("Project[{}] [{}.{}] refresh check and save snapshot table location files failed", project,
                     database, table);
+            log.error(e.getMessage(), e);
             return new SnapshotSourceTableStatsResponse(false);
         }
     }
@@ -197,8 +198,8 @@ public class SnapshotSourceTableStatsService extends BasicService {
         }
         val needRefresh = checkLocation(location, filesStatus, snapshotSourceTableStatsJson, projectConfig);
         if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) {
-            createSnapshotSourceTableStats(location, projectConfig, filesStatus, snapshotSourceTableStatsJson);
-            writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson);
+            val newSnapshotSourceTableStatsJson = createSnapshotSourceTableStats(location, projectConfig, filesStatus);
+            writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson);
         }
         if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) {
             return projectConfig.isSnapshotFirstAutoRefreshEnabled();
@@ -279,10 +280,10 @@ public class SnapshotSourceTableStatsService extends BasicService {
                         tableFilesModifyTimesAndSize.get(FILES_SIZE));
     }
 
-    public void createSnapshotSourceTableStats(String location, KylinConfig config,
-            List<FileStatus> locationFilesStatus, Map<String, SnapshotSourceTableStats> snapshotSourceTableStatsJson) {
-        val sourceTableStats = snapshotSourceTableStatsJson.computeIfAbsent(location,
-                key -> new SnapshotSourceTableStats());
+    public Map<String, SnapshotSourceTableStats> createSnapshotSourceTableStats(String location, KylinConfig config,
+            List<FileStatus> locationFilesStatus) {
+        Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap();
+        val sourceTableStats = new SnapshotSourceTableStats();
         val filesSize = Lists.<Long> newArrayList();
         val filesModificationTime = Lists.<Long> newArrayList();
         locationFilesStatus.stream().limit(config.getSnapshotAutoRefreshFetchFilesCount()).forEach(fileStatus -> {
@@ -293,7 +294,8 @@ public class SnapshotSourceTableStatsService extends BasicService {
         sourceTableStats.setFilesModificationTime(filesModificationTime);
         sourceTableStats.setFilesCount(locationFilesStatus.size());
 
-        snapshotSourceTableStatsJson.put(location, sourceTableStats);
+        newSnapshotSourceTableStatsJson.put(location, sourceTableStats);
+        return newSnapshotSourceTableStatsJson;
     }
 
     public void writeSourceTableStats(String project, String tableIdentity,
@@ -332,11 +334,12 @@ public class SnapshotSourceTableStatsService extends BasicService {
         val needRefresh = checkPartitionsLocation(partitions, snapshotSourceTableStatsJson, needRefreshPartitions,
                 needSavePartitionsFilesStatus, projectConfig);
         if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) {
+            Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap();
             for (CatalogTablePartition partition : partitions) {
                 createPartitionSnapshotSourceTableStats(partition, needSavePartitionsFilesStatus,
-                        snapshotSourceTableStatsJson, projectConfig);
+                        newSnapshotSourceTableStatsJson, projectConfig);
             }
-            writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson);
+            writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson);
         }
         if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) {
             return projectConfig.isSnapshotFirstAutoRefreshEnabled();
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java
deleted file mode 100644
index 45b6850d92..0000000000
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import org.apache.kylin.junit.annotation.MetadataInfo;
-import org.junit.Test;
-import org.mockito.InjectMocks;
-import org.mockito.Mockito;
-
-@MetadataInfo
-class TestSnapshotSourceTableStatsService {
-    private static final String DEFAULT_PROJECT = "default";
-    @InjectMocks
-    private final SnapshotSourceTableStatsService locationService = Mockito.spy(SnapshotSourceTableStatsService.class);
-
-    @Test
-    void saveSnapshotViewMapping() {
-
-    }
-}