You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/13 10:25:19 UTC
[kylin] 12/25: Fix secondstorage skipping index job error
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b736b25dfca043e98c7f22690ea5d7b038ffbad2
Author: Shuai li <lo...@live.cn>
AuthorDate: Wed Oct 12 19:54:52 2022 +0800
Fix secondstorage skipping index job error
---
.../kap/secondstorage/SecondStorageIndexTest.java | 11 +++++-----
.../management/SecondStorageService.java | 24 +++++++++++++++++-----
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
index fbfe232328..5f561c7183 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
@@ -348,8 +349,8 @@ public class SecondStorageIndexTest implements JobWaiter {
String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet());
waitJobEnd(getProject(), jobId);
- assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class,
- () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
+ assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()),
+ KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
clickhouse[0].start();
ClickHouseUtils.internalConfigClickHouse(clickhouse, replica);
@@ -562,11 +563,9 @@ public class SecondStorageIndexTest implements JobWaiter {
assertEquals(1, tableEntity.getSecondaryIndexColumns().size());
assertTrue(tableEntity.getSecondaryIndexColumns().contains(0));
- buildIncrementalLoadQuery("2012-01-02", "2012-01-03",
- new HashSet<>(
- NIndexPlanManager.getInstance(getConfig(), getProject()).getIndexPlan(modelId).getAllLayouts()),
+ buildIncrementalLoadQuery("2012-01-02", "2012-01-03", new HashSet<>(getIndexPlan(modelId).getAllLayouts()),
modelId);
- waitAllJobFinish();
+ waitAllJoEnd();
for (TableData tableData : getTableFlow(modelId).getTableDataList()) {
for (TablePartition partition : tableData.getPartitions()) {
diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
index 9c4fb6c2fc..d10474b43e 100644
--- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
+++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
@@ -326,7 +326,6 @@ public class SecondStorageService extends BasicService implements SecondStorageU
deleteLayoutChTable(project, modelId, layout.getId());
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
getTablePlan(project, modelId).update(tp -> tp.updatePrimaryIndexColumns(layout.getId(), columns));
- deleteLayoutChTable(project, modelId, layout.getId());
return null;
}, project, 1, UnitOfWork.DEFAULT_EPOCH_ID);
}
@@ -370,11 +369,20 @@ public class SecondStorageService extends BasicService implements SecondStorageU
}
private void deleteLayoutChTable(String project, String modelId, long layoutId) {
- String database = NameUtil.getDatabase(getConfig(), project);
+ KylinConfig config = getConfig();
+ String database = NameUtil.getDatabase(config, project);
String table = NameUtil.getTable(modelId, layoutId);
- for (NodeGroup nodeGroup : SecondStorageUtil.listNodeGroup(getConfig(), project)) {
- nodeGroup.getNodeNames().forEach(node -> SecondStorageFactoryUtils
- .createDatabaseOperator(SecondStorageNodeHelper.resolve(node)).dropTable(database, table));
+ for (NodeGroup nodeGroup : SecondStorageUtil.listNodeGroup(config, project)) {
+ nodeGroup.getNodeNames().forEach(node -> {
+ DatabaseOperator operator = SecondStorageFactoryUtils
+ .createDatabaseOperator(SecondStorageNodeHelper.resolve(node));
+ try {
+ operator.dropTable(database, table);
+ } catch (Exception e) {
+ throw new KylinException(SECOND_STORAGE_NODE_NOT_AVAILABLE,
+ MsgPicker.getMsg().getSecondStorageNodeNotAvailable(node), e);
+ }
+ });
}
}
@@ -1149,6 +1157,12 @@ public class SecondStorageService extends BasicService implements SecondStorageU
private void checkUpdateIndex(String project, String modelId) {
SecondStorageUtil.validateProjectLock(project, Collections.singletonList(LockTypeEnum.LOAD.name()));
List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId);
+ if (!jobs.isEmpty()) {
+ throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+ }
+ jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR),
+ Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES));
if (!jobs.isEmpty()) {
throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
MsgPicker.getMsg().getSecondStorageConcurrentOperate());