You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:20:59 UTC
[kylin] 07/22: KYLIN-5315 update AutoRefreshSnapshotScheduler afterPropertiesSet
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 275d57dea54a3752cf621e86b7660d4a7eddb13e
Author: jlf <lo...@kyligence.io>
AuthorDate: Fri Oct 14 10:01:24 2022 +0800
KYLIN-5315 update AutoRefreshSnapshotScheduler afterPropertiesSet
---
.../rest/scheduler/AutoRefreshSnapshotRunner.java | 64 ++++++++-------------
.../scheduler/AutoRefreshSnapshotScheduler.java | 38 +++++++++++--
...pshotThread.java => BuildSnapshotRunnable.java} | 4 +-
...leThread.java => CheckSourceTableRunnable.java} | 2 +-
.../scheduler/AutoRefreshSnapshotConfigTest.java | 58 +++++++++++++++++++
.../scheduler/AutoRefreshSnapshotRunnerTest.java | 66 ++++++----------------
...eadTest.java => BuildSnapshotRunnableTest.java} | 20 +++----
...Test.java => CheckSourceTableRunnableTest.java} | 11 ++--
.../SnapshotSourceTableStatsServiceTest.scala | 7 +--
.../service/SnapshotSourceTableStatsService.java | 21 ++++---
.../TestSnapshotSourceTableStatsService.java | 36 ------------
11 files changed, 166 insertions(+), 161 deletions(-)
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
index c3e7ce5046..294b6ddb75 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java
@@ -87,8 +87,6 @@ public class AutoRefreshSnapshotRunner implements Runnable {
@Getter
private Map<Future<String>, Long> checkSourceTableFutures = Maps.newConcurrentMap();
@Getter
- private Map<Future<String>, Long> buildSnapshotFutures = Maps.newConcurrentMap();
- @Getter
private final String project;
@Setter
@Getter
@@ -141,6 +139,8 @@ public class AutoRefreshSnapshotRunner implements Runnable {
poolExecutor.getPoolSize(), poolExecutor.getCorePoolSize(), poolExecutor.getActiveCount(),
poolExecutor.getMaximumPoolSize());
}
+ projectConfig = NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).getProject(project)
+ .getConfig();
saveSnapshotViewMapping(project, restTemplate);
val tables = SnapshotJobUtils.getSnapshotTables(projectConfig, project);
val viewTableMapping = readViewTableMapping();
@@ -152,8 +152,6 @@ public class AutoRefreshSnapshotRunner implements Runnable {
waitCheckSourceTableTaskDone();
- waitBuildSnapshotTaskDone();
-
log.info("Project[{}] stop check and refresh snapshot", project);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -163,9 +161,7 @@ public class AutoRefreshSnapshotRunner implements Runnable {
} finally {
checkSourceTableQueue = new LinkedBlockingQueue<>();
cancelFuture(checkSourceTableFutures);
- cancelFuture(buildSnapshotFutures);
checkSourceTableFutures = Maps.newConcurrentMap();
- buildSnapshotFutures = Maps.newConcurrentMap();
sourceTableSnapshotMapping = Maps.newHashMap();
buildSnapshotCount = Maps.newConcurrentMap();
}
@@ -214,10 +210,12 @@ public class AutoRefreshSnapshotRunner implements Runnable {
}
}
for (TableDesc tableDesc : tables) {
- val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT);
- val snapshots = result.getOrDefault(source, Lists.newArrayList());
- snapshots.add(tableDesc);
- result.put(source, snapshots.stream().distinct().collect(Collectors.toList()));
+ if (!tableDesc.isView()) {
+ val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT);
+ val snapshots = result.getOrDefault(source, Lists.newArrayList());
+ snapshots.add(tableDesc);
+ result.put(source, snapshots.stream().distinct().collect(Collectors.toList()));
+ }
}
return result;
}
@@ -259,16 +257,16 @@ public class AutoRefreshSnapshotRunner implements Runnable {
public void checkSourceTable(Set<String> allSourceTable) {
for (String table : allSourceTable) {
- val thread = new CheckSourceTableThread();
- thread.setProject(project);
- thread.setConfig(projectConfig);
- thread.setTableIdentity(table);
- thread.setRestTemplate(restTemplate);
- thread.setCheckSourceTableQueue(checkSourceTableQueue);
+ val runnable = new CheckSourceTableRunnable();
+ runnable.setProject(project);
+ runnable.setConfig(projectConfig);
+ runnable.setTableIdentity(table);
+ runnable.setRestTemplate(restTemplate);
+ runnable.setCheckSourceTableQueue(checkSourceTableQueue);
sourceTableSnapshotMapping.get(table).stream()
.filter(tableDesc -> StringUtils.equalsIgnoreCase(table, tableDesc.getIdentity())).findFirst()
- .ifPresent(tableDesc -> thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()));
- val submit = jobPool.submit(thread, "success");
+ .ifPresent(tableDesc -> runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()));
+ val submit = jobPool.submit(runnable, "success");
checkSourceTableFutures.put(submit, System.currentTimeMillis());
}
}
@@ -304,33 +302,21 @@ public class AutoRefreshSnapshotRunner implements Runnable {
}
}
- public void waitBuildSnapshotTaskDone() throws InterruptedException {
- while (true) {
- val doneCount = buildSnapshotFutures.keySet().stream().filter(Future::isDone).count();
- if (buildSnapshotFutures.size() == doneCount) {
- break;
- }
- cancelTimeoutFuture(buildSnapshotFutures);
- TimeUnit.SECONDS.sleep(10);
- }
- }
-
public void buildSnapshot(CheckSourceTableResult result) {
val needBuildSnapshots = sourceTableSnapshotMapping.get(result.getTableIdentity());
for (TableDesc tableDesc : needBuildSnapshots) {
val sourceTableCount = buildSnapshotCount.getOrDefault(tableDesc.getIdentity(), new AtomicInteger(0));
log.info("buildSnapshotCount is [{}], tableIdentity is [{}]", sourceTableCount, tableDesc.getIdentity());
if (sourceTableCount.getAndIncrement() == 0) {
- val thread = new BuildSnapshotThread();
- thread.setProject(project);
- thread.setConfig(projectConfig);
- thread.setRestTemplate(restTemplate);
- thread.setNeedRefresh(result.getNeedRefresh());
- thread.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue());
- thread.setTableIdentity(tableDesc.getIdentity());
- thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
- val submit = jobPool.submit(thread, "success");
- buildSnapshotFutures.put(submit, System.currentTimeMillis());
+ val runnable = new BuildSnapshotRunnable();
+ runnable.setProject(project);
+ runnable.setConfig(projectConfig);
+ runnable.setRestTemplate(restTemplate);
+ runnable.setNeedRefresh(result.getNeedRefresh());
+ runnable.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue());
+ runnable.setTableIdentity(tableDesc.getIdentity());
+ runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
+ runnable.run();
}
buildSnapshotCount.put(tableDesc.getIdentity(), sourceTableCount);
}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
index 539bf6ac08..8639c09d30 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java
@@ -26,17 +26,23 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.scheduler.EpochStartedNotifier;
+import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.annotation.Order;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -46,7 +52,7 @@ import org.springframework.web.client.RestTemplate;
import com.google.common.collect.Maps;
-import org.apache.kylin.metadata.epoch.EpochManager;
+import io.kyligence.kap.guava20.shaded.common.eventbus.Subscribe;
import lombok.Getter;
import lombok.val;
import lombok.extern.slf4j.Slf4j;
@@ -66,7 +72,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@Component
-public class AutoRefreshSnapshotScheduler implements InitializingBean {
+public class AutoRefreshSnapshotScheduler {
private static final Integer THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE = 20;
@Autowired
@Qualifier("projectScheduler")
@@ -228,7 +234,6 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean {
}
}
- @Override
public void afterPropertiesSet() throws Exception {
log.info("AutoRefreshSnapshotScheduler init...");
val fs = HadoopUtil.getWorkingFileSystem();
@@ -257,3 +262,28 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean {
}
}
}
+
+@Slf4j
+@Configuration
+@Order
+class AutoRefreshSnapshotConfig {
+ @Autowired
+ private AutoRefreshSnapshotScheduler scheduler;
+
+ @PostConstruct
+ public void init() {
+ val kylinConfig = KylinConfig.getInstanceFromEnv();
+ if (kylinConfig.isJobNode()) {
+ EventBusFactory.getInstance().register(this, false);
+ }
+ }
+
+ @Subscribe
+ public void registerScheduler(EpochStartedNotifier notifier) {
+ try {
+ scheduler.afterPropertiesSet();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
similarity index 99%
rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java
rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
index 4759260d54..e9ebd6c67a 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java
@@ -42,6 +42,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
@@ -53,7 +54,6 @@ import org.springframework.http.HttpMethod;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
import io.kyligence.kap.guava20.shaded.common.collect.Lists;
import io.kyligence.kap.guava20.shaded.common.collect.Maps;
import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -64,7 +64,7 @@ import lombok.val;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class BuildSnapshotThread extends AbstractSchedulerRunnable {
+public class BuildSnapshotRunnable extends AbstractSchedulerRunnable {
private static final String BUILD_SNAPSHOT_ERROR_MESSAGE = "Project[%s] Snapshot[%s] buildSnapshot failed";
@Override
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
similarity index 98%
rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java
rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
index d983fec99f..9fc0fd9d1d 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java
@@ -46,7 +46,7 @@ import lombok.val;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class CheckSourceTableThread extends AbstractSchedulerRunnable {
+public class CheckSourceTableRunnable extends AbstractSchedulerRunnable {
private static final String SNAPSHOT_TABLE_CHECK_ERROR_MESSAGE = "Project[%s] Snapshot source table[%s] check table stats Failed";
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java
new file mode 100644
index 0000000000..450f0bc088
--- /dev/null
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.scheduler;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.scheduler.EpochStartedNotifier;
+import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import lombok.val;
+
+class AutoRefreshSnapshotConfigTest {
+ @Test
+ void testRegisterScheduler() throws Exception {
+ registerScheduler(true);
+ registerScheduler(false);
+ }
+
+ void registerScheduler(Boolean isJobNode) throws Exception {
+ try (val mockStatic = Mockito.mockStatic(EventBusFactory.class);
+ val configStatic = Mockito.mockStatic(KylinConfig.class)) {
+ val config = Mockito.mock(KylinConfig.class);
+ Mockito.when(config.isJobNode()).thenReturn(isJobNode);
+ configStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(config);
+
+ val buildConfig = new AutoRefreshSnapshotConfig();
+ val eventBus = Mockito.mock(EventBusFactory.class);
+ mockStatic.when(EventBusFactory::getInstance).thenReturn(eventBus);
+ buildConfig.init();
+ val scheduler = Mockito.mock(AutoRefreshSnapshotScheduler.class);
+ ReflectionTestUtils.setField(buildConfig, "scheduler", scheduler);
+
+ Mockito.doNothing().when(scheduler).afterPropertiesSet();
+ buildConfig.registerScheduler(new EpochStartedNotifier());
+
+ Mockito.doThrow(new Exception("test")).when(scheduler).afterPropertiesSet();
+ buildConfig.registerScheduler(new EpochStartedNotifier());
+ }
+ }
+}
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
index ae62d5c7a4..5dd01bf60a 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
@@ -67,7 +66,6 @@ import io.kyligence.kap.guava20.shaded.common.collect.Lists;
import io.kyligence.kap.guava20.shaded.common.collect.Maps;
import io.kyligence.kap.guava20.shaded.common.collect.Sets;
import lombok.val;
-import lombok.var;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -116,7 +114,6 @@ class AutoRefreshSnapshotRunnerTest {
runner.doRun();
assertTrue(CollectionUtils.isEmpty(runner.getCheckSourceTableQueue()));
assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotCount()));
- assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotFutures()));
assertTrue(MapUtils.isEmpty(runner.getCheckSourceTableFutures()));
assertTrue(MapUtils.isEmpty(runner.getSourceTableSnapshotMapping()));
} finally {
@@ -247,6 +244,16 @@ class AutoRefreshSnapshotRunnerTest {
if (i < 14) {
tables.add(allTables.get(i));
}
+ if (allTables.get(i).isView()) {
+ tables.add(allTables.get(i));
+ val sourceTables = Sets.<String> newHashSet();
+ for (int j = 0; j < 7; j++) {
+ sourceTables.add("default.table_" + j);
+ excepted.add("default.table_" + j);
+ }
+ sourceTables.add(allTables.get(i).getIdentity().toLowerCase(Locale.ROOT));
+ viewTableMapping.put(allTables.get(i).getIdentity(), sourceTables);
+ }
if (i > 7) {
val sourceTables = Sets.<String> newHashSet();
for (int j = 0; j < 7; j++) {
@@ -307,9 +314,9 @@ class AutoRefreshSnapshotRunnerTest {
val sourceTables = sourceTableSnapshotMapping.keySet();
runner.getSourceTableSnapshotMapping().putAll(sourceTableSnapshotMapping);
- try (val ignored = Mockito.mockConstruction(CheckSourceTableThread.class,
+ try (val ignored = Mockito.mockConstruction(CheckSourceTableRunnable.class,
(mock, context) -> Mockito.doNothing().when(mock).checkTable())) {
- try (val ignored2 = Mockito.mockConstruction(BuildSnapshotThread.class,
+ try (val ignored2 = Mockito.mockConstruction(BuildSnapshotRunnable.class,
(mock, context) -> Mockito.doNothing().when(mock).buildSnapshot())) {
runner.checkSourceTable(sourceTables);
@@ -334,9 +341,6 @@ class AutoRefreshSnapshotRunnerTest {
exceptedTmp.addAll(tableDescs);
val excepted = exceptedTmp.stream().distinct().collect(Collectors.toList());
assertEquals(excepted.size(), buildSnapshotCount.size());
-
- val buildSnapshotFutures = runner.getBuildSnapshotFutures();
- assertEquals(excepted.size(), buildSnapshotFutures.size());
}
}
} finally {
@@ -344,34 +348,6 @@ class AutoRefreshSnapshotRunnerTest {
}
}
- @Test
- void waitBuildSnapshotTaskDone() {
- val project = "default";
- try {
- val runner = AutoRefreshSnapshotRunner.getInstance(project);
- val tasks = Lists.<Future<String>> newArrayList();
- for (int i = 0; i < 5; i++) {
- val futureTask = new FutureTask<String>(() -> null);
- tasks.add(futureTask);
- runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
- }
- val result = new AtomicBoolean(false);
- val thread = new Thread(() -> {
- try {
- runner.waitBuildSnapshotTaskDone();
- result.set(true);
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- }
- });
- thread.start();
- tasks.forEach(task -> task.cancel(true));
- await().atMost(new Duration(12, SECONDS)).untilAsserted(() -> assertTrue(result.get()));
- } finally {
- AutoRefreshSnapshotRunner.shutdown(project);
- }
- }
-
@Test
void cancelTimeoutFuture() {
val project = RandomUtil.randomUUIDStr();
@@ -385,11 +361,11 @@ class AutoRefreshSnapshotRunnerTest {
for (int i = 0; i < 5; i++) {
val futureTask = new FutureTask<String>(() -> null);
tasks.add(futureTask);
- runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
+ runner.getCheckSourceTableFutures().put(futureTask, System.currentTimeMillis());
}
await().pollDelay(new Duration(2, SECONDS)).until(() -> true);
- runner.cancelTimeoutFuture(runner.getBuildSnapshotFutures());
- runner.getBuildSnapshotFutures().keySet().forEach(future -> {
+ runner.cancelTimeoutFuture(runner.getCheckSourceTableFutures());
+ runner.getCheckSourceTableFutures().keySet().forEach(future -> {
assertTrue(future.isCancelled());
assertTrue(future.isDone());
});
@@ -457,16 +433,6 @@ class AutoRefreshSnapshotRunnerTest {
val overrideProps = Maps.<String, String> newLinkedHashMap();
projectManager.createProject(project, "test", "", overrideProps);
val runner = AutoRefreshSnapshotRunner.getInstance(project);
- for (int i = 0; i < 5; i++) {
- val futureTask = new FutureTask<String>(() -> null);
- runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis());
- if (i % 2 == 0) {
- futureTask.cancel(true);
- }
- }
- runner.cancelFuture(runner.getBuildSnapshotFutures());
- var actual = runner.getBuildSnapshotFutures().keySet().stream().filter(Future::isDone).count();
- assertEquals(runner.getBuildSnapshotFutures().size(), actual);
for (int i = 0; i < 5; i++) {
val futureTask = new FutureTask<String>(() -> null);
@@ -476,7 +442,7 @@ class AutoRefreshSnapshotRunnerTest {
}
}
runner.cancelFuture(runner.getCheckSourceTableFutures());
- actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count();
+ val actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count();
assertEquals(runner.getCheckSourceTableFutures().size(), actual);
} finally {
AutoRefreshSnapshotRunner.shutdown(project);
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
similarity index 97%
rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java
rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
index 214ac7954d..b2d856887e 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java
@@ -36,6 +36,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
@@ -51,7 +52,6 @@ import org.springframework.web.client.RestTemplate;
import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
import io.kyligence.kap.guava20.shaded.common.collect.Lists;
import io.kyligence.kap.guava20.shaded.common.collect.Maps;
import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -59,12 +59,12 @@ import lombok.val;
import lombok.var;
@MetadataInfo
-class BuildSnapshotThreadTest {
+class BuildSnapshotRunnableTest {
private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
@Test
void buildSnapshot() throws JsonProcessingException {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setProject("project");
thread.setConfig(KylinConfig.readSystemKylinConfig());
thread.setRestTemplate(restTemplate);
@@ -112,7 +112,7 @@ class BuildSnapshotThreadTest {
@Test
void buildSnapshotFailed() throws JsonProcessingException {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setProject("project");
thread.setConfig(KylinConfig.readSystemKylinConfig());
thread.setRestTemplate(restTemplate);
@@ -149,7 +149,7 @@ class BuildSnapshotThreadTest {
@Test
void checkSnapshotJobFile() {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setConfig(KylinConfig.getInstanceFromEnv());
thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
val jobId = RandomUtil.randomUUIDStr();
@@ -182,7 +182,7 @@ class BuildSnapshotThreadTest {
@Test
void checkAutoRefreshJobSuccessOrRunning() {
val jobId = RandomUtil.randomUUIDStr();
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setConfig(KylinConfig.getInstanceFromEnv());
assertFalse(thread.checkAutoRefreshJobSuccessOrRunning(jobId));
@@ -205,7 +205,7 @@ class BuildSnapshotThreadTest {
@Test
void snapshotJobFile() {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setConfig(KylinConfig.getInstanceFromEnv());
thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
val jobId = RandomUtil.randomUUIDStr();
@@ -219,7 +219,7 @@ class BuildSnapshotThreadTest {
@Test
void snapshotJobFileNotExists() {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setConfig(KylinConfig.getInstanceFromEnv());
thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_"));
val snapshotJob = thread.readSnapshotJobFile();
@@ -228,7 +228,7 @@ class BuildSnapshotThreadTest {
@Test
void checkNeedBuildPartitionAndSetTableOption() throws JsonProcessingException {
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setTableIdentity("default.table");
val req = Maps.newHashMap();
val runningJobs = Lists.<NSparkSnapshotJob> newArrayList();
@@ -279,7 +279,7 @@ class BuildSnapshotThreadTest {
Mockito.when(executableManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, SNAPSHOT_BUILD,
SNAPSHOT_REFRESH)).thenReturn(runningJobs);
- val thread = new BuildSnapshotThread();
+ val thread = new BuildSnapshotRunnable();
thread.setTableIdentity("default.table");
thread.setProject("default");
try {
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
similarity index 93%
rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java
rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
index 920e89bd76..db82b60d66 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java
@@ -42,12 +42,12 @@ import io.kyligence.kap.guava20.shaded.common.collect.Sets;
import lombok.val;
@MetadataInfo
-class CheckSourceTableThreadTest {
+class CheckSourceTableRunnableTest {
private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
@Test
void checkTable() throws JsonProcessingException {
- val thread = new CheckSourceTableThread();
+ val thread = new CheckSourceTableRunnable();
thread.setProject("project");
thread.setConfig(KylinConfig.readSystemKylinConfig());
thread.setTableIdentity("default.table");
@@ -72,7 +72,7 @@ class CheckSourceTableThreadTest {
@Test
void checkTableFailed() {
try {
- val thread = new CheckSourceTableThread();
+ val thread = new CheckSourceTableRunnable();
thread.setProject("project");
thread.setConfig(KylinConfig.readSystemKylinConfig());
thread.setTableIdentity("default.table");
@@ -83,9 +83,8 @@ class CheckSourceTableThreadTest {
thread.checkTable();
} catch (Exception e) {
assertTrue(e instanceof KylinRuntimeException);
- assertEquals(
- "Project[project] Snapshot source table[default.table] check table stats Failed",
+ assertEquals("Project[project] Snapshot source table[default.table] check table stats Failed",
e.getMessage());
}
}
-}
\ No newline at end of file
+}
diff --git a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
index 635b6e54f7..6663a71041 100644
--- a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
+++ b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
@@ -194,9 +194,8 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local
val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT)
val locationPath = table.location.getPath
val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath)
- val snapshotTablesLocationsJson = Maps.newHashMap[String, SnapshotSourceTableStats]()
- snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config,
- locationFilesStatus, snapshotTablesLocationsJson)
+ val snapshotTablesLocationsJson = snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config,
+ locationFilesStatus)
snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, tableIdentity, snapshotTablesLocationsJson)
val fromJson = snapshotSourceTableStatsService.getSnapshotSourceTableStatsJsonFromHDFS(DEFAULT_PROJECT, tableIdentity).getSecond
@@ -522,4 +521,4 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local
assertFalse(checkStatsFile)
})
}
-}
\ No newline at end of file
+}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
index 360ecad17f..8dcc09980d 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
@@ -141,6 +141,7 @@ public class SnapshotSourceTableStatsService extends BasicService {
} catch (Exception e) {
log.info("Project[{}] [{}.{}] refresh check and save snapshot table location files failed", project,
database, table);
+ log.error(e.getMessage(), e);
return new SnapshotSourceTableStatsResponse(false);
}
}
@@ -197,8 +198,8 @@ public class SnapshotSourceTableStatsService extends BasicService {
}
val needRefresh = checkLocation(location, filesStatus, snapshotSourceTableStatsJson, projectConfig);
if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) {
- createSnapshotSourceTableStats(location, projectConfig, filesStatus, snapshotSourceTableStatsJson);
- writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson);
+ val newSnapshotSourceTableStatsJson = createSnapshotSourceTableStats(location, projectConfig, filesStatus);
+ writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson);
}
if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) {
return projectConfig.isSnapshotFirstAutoRefreshEnabled();
@@ -279,10 +280,10 @@ public class SnapshotSourceTableStatsService extends BasicService {
tableFilesModifyTimesAndSize.get(FILES_SIZE));
}
- public void createSnapshotSourceTableStats(String location, KylinConfig config,
- List<FileStatus> locationFilesStatus, Map<String, SnapshotSourceTableStats> snapshotSourceTableStatsJson) {
- val sourceTableStats = snapshotSourceTableStatsJson.computeIfAbsent(location,
- key -> new SnapshotSourceTableStats());
+ public Map<String, SnapshotSourceTableStats> createSnapshotSourceTableStats(String location, KylinConfig config,
+ List<FileStatus> locationFilesStatus) {
+ Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap();
+ val sourceTableStats = new SnapshotSourceTableStats();
val filesSize = Lists.<Long> newArrayList();
val filesModificationTime = Lists.<Long> newArrayList();
locationFilesStatus.stream().limit(config.getSnapshotAutoRefreshFetchFilesCount()).forEach(fileStatus -> {
@@ -293,7 +294,8 @@ public class SnapshotSourceTableStatsService extends BasicService {
sourceTableStats.setFilesModificationTime(filesModificationTime);
sourceTableStats.setFilesCount(locationFilesStatus.size());
- snapshotSourceTableStatsJson.put(location, sourceTableStats);
+ newSnapshotSourceTableStatsJson.put(location, sourceTableStats);
+ return newSnapshotSourceTableStatsJson;
}
public void writeSourceTableStats(String project, String tableIdentity,
@@ -332,11 +334,12 @@ public class SnapshotSourceTableStatsService extends BasicService {
val needRefresh = checkPartitionsLocation(partitions, snapshotSourceTableStatsJson, needRefreshPartitions,
needSavePartitionsFilesStatus, projectConfig);
if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) {
+ Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap();
for (CatalogTablePartition partition : partitions) {
createPartitionSnapshotSourceTableStats(partition, needSavePartitionsFilesStatus,
- snapshotSourceTableStatsJson, projectConfig);
+ newSnapshotSourceTableStatsJson, projectConfig);
}
- writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson);
+ writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson);
}
if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) {
return projectConfig.isSnapshotFirstAutoRefreshEnabled();
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java
deleted file mode 100644
index 45b6850d92..0000000000
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import org.apache.kylin.junit.annotation.MetadataInfo;
-import org.junit.Test;
-import org.mockito.InjectMocks;
-import org.mockito.Mockito;
-
-@MetadataInfo
-class TestSnapshotSourceTableStatsService {
- private static final String DEFAULT_PROJECT = "default";
- @InjectMocks
- private final SnapshotSourceTableStatsService locationService = Mockito.spy(SnapshotSourceTableStatsService.class);
-
- @Test
- void saveSnapshotViewMapping() {
-
- }
-}