You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/08/29 01:42:12 UTC
[incubator-doris] branch master updated: [Optimize] Make light
schema change complete more faster under concurrent conditions (#6292)
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2a13da [Optimize] Make light schema change complete more faster under concurrent conditions (#6292)
a2a13da is described below
commit a2a13dadba57dac8d3960334501acc8722f6c11c
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Sun Aug 29 09:41:56 2021 +0800
[Optimize] Make light schema change complete more faster under concurrent conditions (#6292)
* [Optimize] Make schema change complete more faster under concurrent conditions
Co-authored-by: caiconghui <ca...@xiaomi.com>
---
.../java/org/apache/doris/alter/AlterHandler.java | 6 +-
.../apache/doris/alter/SchemaChangeHandler.java | 77 +++++++++++++++++++++-
.../java/org/apache/doris/common/FeConstants.java | 3 +
3 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 4cf206a..61fdcca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -86,7 +86,11 @@ public abstract class AlterHandler extends MasterDaemon {
}
public AlterHandler(String name) {
- super(name, FeConstants.default_scheduler_interval_millisecond);
+ this(name, FeConstants.default_scheduler_interval_millisecond);
+ }
+
+ public AlterHandler(String name, int scheduler_interval_millisecond) {
+ super(name, scheduler_interval_millisecond);
}
protected void addAlterJobV2(AlterJobV2 alterJob) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a109192..e25a27f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -61,12 +61,14 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
@@ -97,6 +99,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SchemaChangeHandler extends AlterHandler {
@@ -105,8 +108,20 @@ public class SchemaChangeHandler extends AlterHandler {
// all shadow indexes should have this prefix in name
public static final String SHADOW_NAME_PRFIX = "__doris_shadow_";
+ public static final int MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE = 10;
+
+ public static final int CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB = 20;
+
+ public final ThreadPoolExecutor schemaChangeThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE, "schema-change-pool", true);
+
+ public final Map<Long, AlterJobV2> activeSchemaChangeJobsV2 = Maps.newConcurrentMap();
+
+ public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap();
+
+ public int cycle_count = 0;
+
public SchemaChangeHandler() {
- super("schema change");
+ super("schema change", FeConstants.default_schema_change_scheduler_interval_millisecond);
}
private void processAddColumn(AddColumnClause alterClause, OlapTable olapTable,
@@ -1387,13 +1402,36 @@ public class SchemaChangeHandler extends AlterHandler {
@Override
protected void runAfterCatalogReady() {
- super.runAfterCatalogReady();
+ if (cycle_count >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
+ clearFinishedOrCancelledSchemaChangeJobV2();
+ super.runAfterCatalogReady();
+ cycle_count = 0;
+ }
runOldAlterJob();
runAlterJobV2();
+ cycle_count++;
}
private void runAlterJobV2() {
- alterJobsV2.values().forEach(AlterJobV2::run);
+ runnableSchemaChangeJobV2.values().forEach(
+ alterJobsV2 -> {
+ if (!alterJobsV2.isDone() && !activeSchemaChangeJobsV2.containsKey(alterJobsV2.getJobId()) &&
+ activeSchemaChangeJobsV2.size() < MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE) {
+ if (FeConstants.runningUnitTest) {
+ alterJobsV2.run();
+ } else {
+ schemaChangeThreadPool.submit(() -> {
+ if (activeSchemaChangeJobsV2.putIfAbsent(alterJobsV2.getJobId(), alterJobsV2) == null) {
+ try {
+ alterJobsV2.run();
+ } finally {
+ activeSchemaChangeJobsV2.remove(alterJobsV2.getJobId());
+ }
+ }
+ });
+ }
+ }
+ });
}
@Deprecated
@@ -1999,4 +2037,37 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
}
+
+ @Override
+ protected void addAlterJobV2(AlterJobV2 alterJob) {
+ super.addAlterJobV2(alterJob);
+ runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
+ }
+
+
+ private void clearFinishedOrCancelledSchemaChangeJobV2() {
+ Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator();
+ while (iterator.hasNext()) {
+ AlterJobV2 alterJobV2 = iterator.next().getValue();
+ if (alterJobV2.isDone()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ @Override
+ public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
+ if (runnableSchemaChangeJobV2.containsKey(log.getJobId())) {
+ runnableSchemaChangeJobV2.remove(log.getJobId());
+ }
+ super.replayRemoveAlterJobV2(log);
+ }
+
+ @Override
+ public void replayAlterJobV2(AlterJobV2 alterJob) {
+ if (!alterJob.isDone() && !runnableSchemaChangeJobV2.containsKey(alterJob.getJobId())) {
+ runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
+ }
+ super.replayAlterJobV2(alterJob);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index fc5b18d..b997939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -47,6 +47,9 @@ public class FeConstants {
// default scheduler interval is 10 seconds
public static int default_scheduler_interval_millisecond = 10000;
+ // default schema change scheduler interval is 500 millisecond
+ public static int default_schema_change_scheduler_interval_millisecond = 500;
+
// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_CURRENT;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org