You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:00 UTC

[kylin] 08/22: add second storage index lock check

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b04afb32a291a1ba7c223f3b71525444e931539c
Author: Shuai li <lo...@live.cn>
AuthorDate: Fri Oct 14 11:06:41 2022 +0800

    add second storage index lock check
---
 .../kyligence/kap/secondstorage/SecondStorageIndexTest.java | 13 ++++++++++---
 .../kap/secondstorage/management/SecondStorageService.java  |  8 ++++++--
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
index 38065420ca..0a8ad1b714 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
@@ -28,7 +28,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -349,8 +348,8 @@ public class SecondStorageIndexTest implements JobWaiter {
         String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet());
         waitJobEnd(getProject(), jobId);
 
-        assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), getProject()),
-                KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class,
+                () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
         clickhouse[0].start();
         ClickHouseUtils.internalConfigClickHouse(clickhouse, replica);
 
@@ -465,6 +464,14 @@ public class SecondStorageIndexTest implements JobWaiter {
         assertEquals(jobCnt, getNExecutableManager().getAllExecutables().stream()
                 .filter(job -> job instanceof ClickHouseRefreshSecondaryIndexJob).count());
 
+        // test range lock
+        val lockSecondaryIndex = Sets.newHashSet("TEST_KYLIN_FACT.TRANS_ID");
+        SegmentRange<Long> range = SegmentRange.TimePartitionedSegmentRange.createInfinite();
+        SecondStorageLockUtils.acquireLock(modelId, range).lock();
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class,
+                () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, lockSecondaryIndex));
+        SecondStorageLockUtils.unlock(modelId, range);
+
         tableData = getTableFlow(modelId).getTableDataList().get(0);
         partition = tableData.getPartitions().get(0);
         assertTrue(partition.getSecondaryIndexColumns().isEmpty());
diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
index b0886204aa..460e40faed 100644
--- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
+++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
@@ -1159,13 +1159,17 @@ public class SecondStorageService extends BasicService implements SecondStorageU
         List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId);
         if (!jobs.isEmpty()) {
             throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
-                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
+                    MsgPicker.getMsg().getSecondStorageConcurrentOperate());
         }
         jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR),
                 Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES));
         if (!jobs.isEmpty()) {
             throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
-                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
+                    MsgPicker.getMsg().getSecondStorageConcurrentOperate());
+        }
+        if (SecondStorageLockUtils.containsKey(modelId)) {
+            throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+                    MsgPicker.getMsg().getSecondStorageConcurrentOperate());
         }
     }