You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:20:53 UTC

[kylin] 01/22: fix secondstorage index refresh locked

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7cab52b1f03102d43bfac700f3b7cddc3de1cd96
Author: Shuai li <lo...@live.cn>
AuthorDate: Wed Oct 12 19:54:30 2022 +0800

    fix secondstorage index refresh locked
---
 .../kap/secondstorage/SecondStorageIndexTest.java  |  2 +-
 .../job/ClickhouseRefreshSecondaryIndex.java       | 47 +++--------------
 .../kap/clickhouse/job/RefreshSecondaryIndex.java  | 60 ++++++++++------------
 .../management/SecondStorageService.java           |  8 +--
 4 files changed, 39 insertions(+), 78 deletions(-)

diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
index b5473c9f1f..767b045172 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
@@ -349,7 +349,7 @@ public class SecondStorageIndexTest implements JobWaiter {
         String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet());
         waitJobEnd(getProject(), jobId);
 
-        assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()),
+        assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), getProject()),
                 KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
         clickhouse[0].start();
         ClickHouseUtils.internalConfigClickHouse(clickhouse, replica);
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
index 6f818af516..632cf60a88 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
@@ -22,14 +22,12 @@ import static io.kyligence.kap.secondstorage.SecondStorageConstants.STEP_SECOND_
 import static io.kyligence.kap.secondstorage.SecondStorageUtil.getTableFlow;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -39,9 +37,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -102,14 +98,15 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable {
                 }
             }
 
+            val dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId);
+            String database = NameUtil.getDatabase(getConfig(), getProject());
+            String table = NameUtil.getTable(dataflow, layoutId);
+            List<Future<?>> results = Lists.newArrayList();
             List<SecondStorageNode> nodes = SecondStorageUtil.listProjectNodes(getProject());
-            List<RefreshSecondaryIndex> allJob = getAddIndexJob(nodes, newIndexes, layoutId);
-            allJob.addAll(getToBeDeleteIndexJob(nodes, toBeDeleteIndexed, layoutId));
-
-            List<Future<?>> results = new ArrayList<>();
             val taskPool = new ThreadPoolExecutor(nodes.size(), nodes.size(), 0L, TimeUnit.MILLISECONDS,
                     new LinkedBlockingQueue<>(), new NamedThreadFactory("Refresh Tiered Storage Index"));
-            allJob.forEach(job -> results.add(taskPool.submit(job::refresh)));
+            nodes.forEach(node -> results.add(taskPool.submit(() -> new RefreshSecondaryIndex(node.getName(), database,
+                    table, newIndexes, toBeDeleteIndexed, dataflow).refresh())));
 
             try {
                 for (Future<?> result : results) {
@@ -128,36 +125,4 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable {
             return ExecuteResult.createSucceed();
         });
     }
-
-    private List<RefreshSecondaryIndex> getAddIndexJob(List<SecondStorageNode> nodes, Set<Integer> newIndexes,
-            long layoutId) {
-        String modelId = getTargetSubject();
-        val indexPlan = NIndexPlanManager.getInstance(getConfig(), project).getIndexPlan(modelId);
-
-        if (indexPlan == null || indexPlan.getLayoutEntity(layoutId) == null) {
-            return Lists.newArrayList();
-        }
-
-        LayoutEntity layout = indexPlan.getLayoutEntity(layoutId);
-        String database = NameUtil.getDatabase(getConfig(), getProject());
-        String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId),
-                layoutId);
-        return nodes.stream()
-                .flatMap(node -> newIndexes.stream().map(column -> new RefreshSecondaryIndex(node.getName(), database,
-                        table, column, layout, RefreshSecondaryIndex.Type.ADD)))
-                .collect(Collectors.toList());
-    }
-
-    private List<RefreshSecondaryIndex> getToBeDeleteIndexJob(List<SecondStorageNode> nodes,
-            Set<Integer> toBeDeleteIndexed, long layoutId) {
-        String modelId = getTargetSubject();
-        String database = NameUtil.getDatabase(getConfig(), getProject());
-        String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId),
-                layoutId);
-
-        return nodes.stream()
-                .flatMap(node -> toBeDeleteIndexed.stream().map(column -> new RefreshSecondaryIndex(node.getName(),
-                        database, table, column, null, RefreshSecondaryIndex.Type.DELETE)))
-                .collect(Collectors.toList());
-    }
 }
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
index 6c6c71193c..8b021828cc 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
@@ -26,9 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,33 +47,32 @@ import lombok.extern.slf4j.Slf4j;
 @Getter
 @Slf4j
 public class RefreshSecondaryIndex {
-
     @JsonProperty("node")
     private String node;
     @JsonProperty("database")
     private String database;
     @JsonProperty("table")
     private String table;
-    @JsonProperty("column_id")
-    private Integer columnId;
-    @JsonProperty("type")
-    private Type type;
+    @JsonProperty("add_indexes")
+    private Set<Integer> addIndexes;
+    @JsonProperty("delete_indexes")
+    private Set<Integer> deleteIndexes;
 
     @JsonIgnore
-    private LayoutEntity layoutEntity;
+    private NDataflow dataflow;
 
     public RefreshSecondaryIndex() {
         // empty
     }
 
-    public RefreshSecondaryIndex(String node, String database, String table, Integer columnId,
-            LayoutEntity layoutEntity, Type type) {
+    public RefreshSecondaryIndex(String node, String database, String table, Set<Integer> addIndexes,
+            Set<Integer> deleteIndexes, NDataflow dataflow) {
         this.node = node;
         this.database = database;
         this.table = table;
-        this.columnId = columnId;
-        this.layoutEntity = layoutEntity;
-        this.type = type;
+        this.dataflow = dataflow;
+        this.addIndexes = addIndexes;
+        this.deleteIndexes = deleteIndexes;
     }
 
     public void refresh() {
@@ -88,29 +85,31 @@ public class RefreshSecondaryIndex {
                 return;
             }
             Set<String> existSkipIndex = existSkippingIndex(clickHouse, database, table);
-            String column = getPrefixColumn(String.valueOf(columnId));
-            String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column);
-            if (type == Type.ADD) {
-                addSkippingIndex(clickHouse, tableIdentifier, column, indexName, existSkipIndex);
-            } else if (type == Type.DELETE) {
-                deleteSkippingIndex(clickHouse, tableIdentifier, indexName, existSkipIndex);
+
+            for (Integer deleteIndexColumnId : deleteIndexes) {
+                deleteSkippingIndex(clickHouse, tableIdentifier, deleteIndexColumnId, existSkipIndex);
+            }
+
+            for (Integer addIndexColumnId : addIndexes) {
+                addSkippingIndex(clickHouse, tableIdentifier, addIndexColumnId, existSkipIndex);
             }
         } catch (SQLException e) {
-            log.error("node {} clean index {}.{} failed", node, database, table);
+            log.error("node {} update index {}.{} failed", node, database, table);
             ExceptionUtils.rethrow(e);
         }
     }
 
-    private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String column,
-            String indexName, Set<String> existSkipIndex) throws SQLException {
-        NDataModel model = layoutEntity.getModel();
-        KylinConfig modelConfig = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), model.getProject())
-                .getDataflow(model.getId()).getConfig();
+    private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId,
+            Set<String> existSkipIndex) throws SQLException {
+        String column = getPrefixColumn(String.valueOf(columnId));
+        String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column);
+        KylinConfig modelConfig = dataflow.getConfig();
         int granularity = modelConfig.getSecondStorageSkippingIndexGranularity();
         val render = new ClickHouseRender();
 
         String expr = SkippingIndexChooser
-                .getSkippingIndexType(layoutEntity.getOrderedDimensions().get(columnId).getType()).toSql(modelConfig);
+                .getSkippingIndexType(dataflow.getModel().getEffectiveDimensions().get(columnId).getType())
+                .toSql(modelConfig);
         AlterTable alterTable = new AlterTable(tableIdentifier,
                 new AlterTable.ManipulateIndex(indexName, column, expr, granularity));
         AlterTable materializeTable = new AlterTable(tableIdentifier,
@@ -122,8 +121,9 @@ public class RefreshSecondaryIndex {
         clickHouse.apply(materializeTable.toSql(render));
     }
 
-    private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String indexName,
+    private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId,
             Set<String> existSkipIndex) throws SQLException {
+        String indexName = ClickHouseNameUtil.getSkippingIndexName(table, getPrefixColumn(String.valueOf(columnId)));
         if (!existSkipIndex.contains(indexName)) {
             return;
         }
@@ -150,8 +150,4 @@ public class RefreshSecondaryIndex {
 
         return Sets.newHashSet();
     }
-
-    enum Type {
-        ADD, DELETE;
-    }
 }
diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
index eb43fddf83..b0886204aa 100644
--- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
+++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
@@ -1158,14 +1158,14 @@ public class SecondStorageService extends BasicService implements SecondStorageU
         SecondStorageUtil.validateProjectLock(project, Collections.singletonList(LockTypeEnum.LOAD.name()));
         List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId);
         if (!jobs.isEmpty()) {
-            throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
-                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+            throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
         }
         jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR),
                 Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES));
         if (!jobs.isEmpty()) {
-            throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
-                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+            throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+                    String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
         }
     }