You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/08/29 01:42:12 UTC

[incubator-doris] branch master updated: [Optimize] Make light schema change complete more faster under concurrent conditions (#6292)

This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a2a13da  [Optimize] Make light schema change complete more faster under concurrent conditions (#6292)
a2a13da is described below

commit a2a13dadba57dac8d3960334501acc8722f6c11c
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Sun Aug 29 09:41:56 2021 +0800

    [Optimize] Make light schema change complete more faster under concurrent conditions (#6292)
    
    * [Optimize] Make schema change complete more faster under concurrent conditions
    
    Co-authored-by: caiconghui <ca...@xiaomi.com>
---
 .../java/org/apache/doris/alter/AlterHandler.java  |  6 +-
 .../apache/doris/alter/SchemaChangeHandler.java    | 77 +++++++++++++++++++++-
 .../java/org/apache/doris/common/FeConstants.java  |  3 +
 3 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 4cf206a..61fdcca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -86,7 +86,11 @@ public abstract class AlterHandler extends MasterDaemon {
     }
     
     public AlterHandler(String name) {
-        super(name, FeConstants.default_scheduler_interval_millisecond);
+        this(name, FeConstants.default_scheduler_interval_millisecond);
+    }
+
+    public AlterHandler(String name, int scheduler_interval_millisecond) {
+        super(name, scheduler_interval_millisecond);
     }
 
     protected void addAlterJobV2(AlterJobV2 alterJob) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a109192..e25a27f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -61,12 +61,14 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -97,6 +99,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public class SchemaChangeHandler extends AlterHandler {
@@ -105,8 +108,20 @@ public class SchemaChangeHandler extends AlterHandler {
     // all shadow indexes should have this prefix in name
     public static final String SHADOW_NAME_PRFIX = "__doris_shadow_";
 
+    public static final int MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE = 10;
+
+    public static final int CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB = 20;
+
+    public final ThreadPoolExecutor schemaChangeThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE, "schema-change-pool", true);
+
+    public final Map<Long, AlterJobV2> activeSchemaChangeJobsV2 = Maps.newConcurrentMap();
+
+    public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap();
+
+    public int cycle_count = 0;
+
     public SchemaChangeHandler() {
-        super("schema change");
+        super("schema change", FeConstants.default_schema_change_scheduler_interval_millisecond);
     }
 
     private void processAddColumn(AddColumnClause alterClause, OlapTable olapTable,
@@ -1387,13 +1402,36 @@ public class SchemaChangeHandler extends AlterHandler {
 
     @Override
     protected void runAfterCatalogReady() {
-        super.runAfterCatalogReady();
+        if (cycle_count >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
+            clearFinishedOrCancelledSchemaChangeJobV2();
+            super.runAfterCatalogReady();
+            cycle_count = 0;
+        }
         runOldAlterJob();
         runAlterJobV2();
+        cycle_count++;
     }
 
     private void runAlterJobV2() {
-        alterJobsV2.values().forEach(AlterJobV2::run);
+        runnableSchemaChangeJobV2.values().forEach(
+                alterJobsV2 -> {
+                    if (!alterJobsV2.isDone() && !activeSchemaChangeJobsV2.containsKey(alterJobsV2.getJobId()) &&
+                            activeSchemaChangeJobsV2.size() < MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE) {
+                        if (FeConstants.runningUnitTest) {
+                            alterJobsV2.run();
+                        } else {
+                            schemaChangeThreadPool.submit(() -> {
+                                if (activeSchemaChangeJobsV2.putIfAbsent(alterJobsV2.getJobId(), alterJobsV2) == null) {
+                                    try {
+                                        alterJobsV2.run();
+                                    } finally {
+                                        activeSchemaChangeJobsV2.remove(alterJobsV2.getJobId());
+                                    }
+                                }
+                            });
+                        }
+                    }
+                });
     }
 
     @Deprecated
@@ -1999,4 +2037,37 @@ public class SchemaChangeHandler extends AlterHandler {
             }
         }
     }
+
+    @Override
+    protected void addAlterJobV2(AlterJobV2 alterJob) {
+        super.addAlterJobV2(alterJob);
+        runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
+    }
+
+
+    private void clearFinishedOrCancelledSchemaChangeJobV2() {
+        Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator();
+        while (iterator.hasNext()) {
+            AlterJobV2 alterJobV2 = iterator.next().getValue();
+            if (alterJobV2.isDone()) {
+                iterator.remove();
+            }
+        }
+    }
+
+    @Override
+    public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
+        if (runnableSchemaChangeJobV2.containsKey(log.getJobId())) {
+            runnableSchemaChangeJobV2.remove(log.getJobId());
+        }
+        super.replayRemoveAlterJobV2(log);
+    }
+
+    @Override
+    public void replayAlterJobV2(AlterJobV2 alterJob) {
+        if (!alterJob.isDone() && !runnableSchemaChangeJobV2.containsKey(alterJob.getJobId())) {
+            runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
+        }
+        super.replayAlterJobV2(alterJob);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index fc5b18d..b997939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -47,6 +47,9 @@ public class FeConstants {
     // default scheduler interval is 10 seconds
     public static int default_scheduler_interval_millisecond = 10000;
 
+    // default schema change scheduler interval is 500 millisecond
+    public static int default_schema_change_scheduler_interval_millisecond = 500;
+
     // general model
     // Current meta data version. Use this version to write journals and image
     public static int meta_version = FeMetaVersion.VERSION_CURRENT;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org