You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:20:53 UTC
[kylin] 01/22: fix secondstorage index refresh locked
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7cab52b1f03102d43bfac700f3b7cddc3de1cd96
Author: Shuai li <lo...@live.cn>
AuthorDate: Wed Oct 12 19:54:30 2022 +0800
fix secondstorage index refresh locked
---
.../kap/secondstorage/SecondStorageIndexTest.java | 2 +-
.../job/ClickhouseRefreshSecondaryIndex.java | 47 +++--------------
.../kap/clickhouse/job/RefreshSecondaryIndex.java | 60 ++++++++++------------
.../management/SecondStorageService.java | 8 +--
4 files changed, 39 insertions(+), 78 deletions(-)
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
index b5473c9f1f..767b045172 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
@@ -349,7 +349,7 @@ public class SecondStorageIndexTest implements JobWaiter {
String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet());
waitJobEnd(getProject(), jobId);
- assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()),
+ assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), getProject()),
KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
clickhouse[0].start();
ClickHouseUtils.internalConfigClickHouse(clickhouse, replica);
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
index 6f818af516..632cf60a88 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
@@ -22,14 +22,12 @@ import static io.kyligence.kap.secondstorage.SecondStorageConstants.STEP_SECOND_
import static io.kyligence.kap.secondstorage.SecondStorageUtil.getTableFlow;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -39,9 +37,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -102,14 +98,15 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable {
}
}
+ val dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId);
+ String database = NameUtil.getDatabase(getConfig(), getProject());
+ String table = NameUtil.getTable(dataflow, layoutId);
+ List<Future<?>> results = Lists.newArrayList();
List<SecondStorageNode> nodes = SecondStorageUtil.listProjectNodes(getProject());
- List<RefreshSecondaryIndex> allJob = getAddIndexJob(nodes, newIndexes, layoutId);
- allJob.addAll(getToBeDeleteIndexJob(nodes, toBeDeleteIndexed, layoutId));
-
- List<Future<?>> results = new ArrayList<>();
val taskPool = new ThreadPoolExecutor(nodes.size(), nodes.size(), 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("Refresh Tiered Storage Index"));
- allJob.forEach(job -> results.add(taskPool.submit(job::refresh)));
+ nodes.forEach(node -> results.add(taskPool.submit(() -> new RefreshSecondaryIndex(node.getName(), database,
+ table, newIndexes, toBeDeleteIndexed, dataflow).refresh())));
try {
for (Future<?> result : results) {
@@ -128,36 +125,4 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable {
return ExecuteResult.createSucceed();
});
}
-
- private List<RefreshSecondaryIndex> getAddIndexJob(List<SecondStorageNode> nodes, Set<Integer> newIndexes,
- long layoutId) {
- String modelId = getTargetSubject();
- val indexPlan = NIndexPlanManager.getInstance(getConfig(), project).getIndexPlan(modelId);
-
- if (indexPlan == null || indexPlan.getLayoutEntity(layoutId) == null) {
- return Lists.newArrayList();
- }
-
- LayoutEntity layout = indexPlan.getLayoutEntity(layoutId);
- String database = NameUtil.getDatabase(getConfig(), getProject());
- String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId),
- layoutId);
- return nodes.stream()
- .flatMap(node -> newIndexes.stream().map(column -> new RefreshSecondaryIndex(node.getName(), database,
- table, column, layout, RefreshSecondaryIndex.Type.ADD)))
- .collect(Collectors.toList());
- }
-
- private List<RefreshSecondaryIndex> getToBeDeleteIndexJob(List<SecondStorageNode> nodes,
- Set<Integer> toBeDeleteIndexed, long layoutId) {
- String modelId = getTargetSubject();
- String database = NameUtil.getDatabase(getConfig(), getProject());
- String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId),
- layoutId);
-
- return nodes.stream()
- .flatMap(node -> toBeDeleteIndexed.stream().map(column -> new RefreshSecondaryIndex(node.getName(),
- database, table, column, null, RefreshSecondaryIndex.Type.DELETE)))
- .collect(Collectors.toList());
- }
}
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
index 6c6c71193c..8b021828cc 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
@@ -26,9 +26,7 @@ import java.util.Set;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.cube.model.NDataflow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,33 +47,32 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Slf4j
public class RefreshSecondaryIndex {
-
@JsonProperty("node")
private String node;
@JsonProperty("database")
private String database;
@JsonProperty("table")
private String table;
- @JsonProperty("column_id")
- private Integer columnId;
- @JsonProperty("type")
- private Type type;
+ @JsonProperty("add_indexes")
+ private Set<Integer> addIndexes;
+ @JsonProperty("delete_indexes")
+ private Set<Integer> deleteIndexes;
@JsonIgnore
- private LayoutEntity layoutEntity;
+ private NDataflow dataflow;
public RefreshSecondaryIndex() {
// empty
}
- public RefreshSecondaryIndex(String node, String database, String table, Integer columnId,
- LayoutEntity layoutEntity, Type type) {
+ public RefreshSecondaryIndex(String node, String database, String table, Set<Integer> addIndexes,
+ Set<Integer> deleteIndexes, NDataflow dataflow) {
this.node = node;
this.database = database;
this.table = table;
- this.columnId = columnId;
- this.layoutEntity = layoutEntity;
- this.type = type;
+ this.dataflow = dataflow;
+ this.addIndexes = addIndexes;
+ this.deleteIndexes = deleteIndexes;
}
public void refresh() {
@@ -88,29 +85,31 @@ public class RefreshSecondaryIndex {
return;
}
Set<String> existSkipIndex = existSkippingIndex(clickHouse, database, table);
- String column = getPrefixColumn(String.valueOf(columnId));
- String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column);
- if (type == Type.ADD) {
- addSkippingIndex(clickHouse, tableIdentifier, column, indexName, existSkipIndex);
- } else if (type == Type.DELETE) {
- deleteSkippingIndex(clickHouse, tableIdentifier, indexName, existSkipIndex);
+
+ for (Integer deleteIndexColumnId : deleteIndexes) {
+ deleteSkippingIndex(clickHouse, tableIdentifier, deleteIndexColumnId, existSkipIndex);
+ }
+
+ for (Integer addIndexColumnId : addIndexes) {
+ addSkippingIndex(clickHouse, tableIdentifier, addIndexColumnId, existSkipIndex);
}
} catch (SQLException e) {
- log.error("node {} clean index {}.{} failed", node, database, table);
+ log.error("node {} update index {}.{} failed", node, database, table);
ExceptionUtils.rethrow(e);
}
}
- private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String column,
- String indexName, Set<String> existSkipIndex) throws SQLException {
- NDataModel model = layoutEntity.getModel();
- KylinConfig modelConfig = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), model.getProject())
- .getDataflow(model.getId()).getConfig();
+ private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId,
+ Set<String> existSkipIndex) throws SQLException {
+ String column = getPrefixColumn(String.valueOf(columnId));
+ String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column);
+ KylinConfig modelConfig = dataflow.getConfig();
int granularity = modelConfig.getSecondStorageSkippingIndexGranularity();
val render = new ClickHouseRender();
String expr = SkippingIndexChooser
- .getSkippingIndexType(layoutEntity.getOrderedDimensions().get(columnId).getType()).toSql(modelConfig);
+ .getSkippingIndexType(dataflow.getModel().getEffectiveDimensions().get(columnId).getType())
+ .toSql(modelConfig);
AlterTable alterTable = new AlterTable(tableIdentifier,
new AlterTable.ManipulateIndex(indexName, column, expr, granularity));
AlterTable materializeTable = new AlterTable(tableIdentifier,
@@ -122,8 +121,9 @@ public class RefreshSecondaryIndex {
clickHouse.apply(materializeTable.toSql(render));
}
- private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String indexName,
+ private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId,
Set<String> existSkipIndex) throws SQLException {
+ String indexName = ClickHouseNameUtil.getSkippingIndexName(table, getPrefixColumn(String.valueOf(columnId)));
if (!existSkipIndex.contains(indexName)) {
return;
}
@@ -150,8 +150,4 @@ public class RefreshSecondaryIndex {
return Sets.newHashSet();
}
-
- enum Type {
- ADD, DELETE;
- }
}
diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
index eb43fddf83..b0886204aa 100644
--- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
+++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
@@ -1158,14 +1158,14 @@ public class SecondStorageService extends BasicService implements SecondStorageU
SecondStorageUtil.validateProjectLock(project, Collections.singletonList(LockTypeEnum.LOAD.name()));
List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId);
if (!jobs.isEmpty()) {
- throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
- String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+ throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
}
jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR),
Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES));
if (!jobs.isEmpty()) {
- throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
- String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+ throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+ String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
}
}