You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/02/19 11:45:42 UTC

[incubator-doris] branch master updated: [Alter] Change table's state right after all rollup jobs being cancelled

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a015cd0  [Alter] Change table's state right after all rollup jobs being cancelled
a015cd0 is described below

commit a015cd0c8bae9a561855a53d4f6df99be0d8ce37
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Feb 19 19:45:35 2020 +0800

    [Alter] Change table's state right after all rollup jobs being cancelled
---
 .../doris/alter/MaterializedViewHandler.java       | 26 +++++---
 .../org/apache/doris/alter/BatchRollupJobTest.java | 70 ++++++++++++++++++++--
 2 files changed, 84 insertions(+), 12 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index e4eaabb..6f8f482 100644
--- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -740,13 +740,13 @@ public class MaterializedViewHandler extends AlterHandler {
         return new HashMap<>(alterJobsV2);
     }
 
-    private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) {
+    private void removeJobFromRunningQueue(AlterJobV2 alterJob) {
         synchronized (tableRunningJobMap) {
-            Set<Long> runningJobIdSet = tableRunningJobMap.get(rollupJobV2.getTableId());
+            Set<Long> runningJobIdSet = tableRunningJobMap.get(alterJob.getTableId());
             if (runningJobIdSet != null) {
-                runningJobIdSet.remove(rollupJobV2.getJobId());
+                runningJobIdSet.remove(alterJob.getJobId());
                 if (runningJobIdSet.size() == 0) {
-                    tableRunningJobMap.remove(rollupJobV2.getTableId());
+                    tableRunningJobMap.remove(alterJob.getTableId());
                 }
             }
         }
@@ -838,16 +838,23 @@ public class MaterializedViewHandler extends AlterHandler {
             // ATTN(cmy): there is still a short gap between "job finish" and "table become normal",
             // so if user send next alter job right after the "job finish",
             // it may encounter "table's state not NORMAL" error.
+
             if (alterJob.isDone()) {
-                removeJobFromRunningQueue(alterJob);
-                if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
-                    changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
-                }
+                onJobDone(alterJob);
                 continue;
             }
         }
     }
 
+    // remove job from running queue and state map, also set table's state to NORMAL if this is
+    // the last running job of the table.
+    private void onJobDone(AlterJobV2 alterJob) {
+        removeJobFromRunningQueue(alterJob);
+        if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
+            changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
+        }
+    }
+
     @Deprecated
     private void runOldAlterJob() {
         List<AlterJob> cancelledJobs = Lists.newArrayList();
@@ -1105,6 +1112,9 @@ public class MaterializedViewHandler extends AlterHandler {
         if (rollupJobV2List.size() != 0) {
             for (AlterJobV2 alterJobV2 : rollupJobV2List) {
                 alterJobV2.cancel("user cancelled");
+                if (alterJobV2.isDone()) {
+                    onJobDone(alterJobV2);
+                }
             }
             return;
         }
diff --git a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
index a24e307..594860b 100644
--- a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
+++ b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.alter;
 
 import org.apache.doris.analysis.AlterTableStmt;
+import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.Catalog;
@@ -29,30 +30,43 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.utframe.UtFrameUtils;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 public class BatchRollupJobTest {
 
     private static String runningDir = "fe/mocked/BatchRollupJobTest/" + UUID.randomUUID().toString() + "/";
+    private static ConnectContext ctx;
 
     @BeforeClass
     public static void setup() throws Exception {
         UtFrameUtils.createMinDorisCluster(runningDir);
+        ctx = UtFrameUtils.createDefaultCtx();
     }
 
-    @Test
-    public void test() throws Exception {
-        ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+    @Before
+    public void before() throws Exception {
+        Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        alterJobs.clear();
+
         // create database db1
-        String createDbStmtStr = "create database db1;";
+        String createDbStmtStr = "create database if not exists db1;";
         CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
         Catalog.getCurrentCatalog().createDb(createDbStmt);
         System.out.println(Catalog.getCurrentCatalog().getDbNames());
+    }
+
+    @Test
+    public void testBatchRollup() throws Exception {
         // create table tbl1
         String createTblStmtStr1 = "create table db1.tbl1(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
         CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx);
@@ -96,4 +110,52 @@ public class BatchRollupJobTest {
             Assert.assertEquals(4, partition.getMaterializedIndices(IndexExtState.VISIBLE).size());
         }
     }
+
+    @Test
+    public void testCancelBatchRollup() throws Exception {
+        // create table tbl2
+        String createTblStmtStr1 = "create table db1.tbl2(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
+        CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx);
+        Catalog.getCurrentCatalog().createTable(createTableStmt);
+
+        // batch add 3 rollups
+        String stmtStr = "alter table db1.tbl2 add rollup r1(k1) duplicate key(k1), r2(k1, k2) duplicate key(k1), r3(k2) duplicate key(k2);";
+        AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
+        Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+
+        Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        Assert.assertEquals(3, alterJobs.size());
+        List<Long> jobIds = Lists.newArrayList(alterJobs.keySet());
+
+        Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1");
+        Assert.assertNotNull(db);
+        OlapTable tbl = (OlapTable) db.getTable("tbl2");
+        Assert.assertNotNull(tbl);
+
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            if (alterJobV2.getType() != AlterJobV2.JobType.ROLLUP) {
+                continue;
+            }
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println(
+                        "rollup job " + alterJobV2.getJobId() + " is running. state: " + alterJobV2.getJobState());
+                Thread.sleep(5000);
+            }
+            System.out.println("rollup job " + alterJobV2.getJobId() + " is done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+
+            Assert.assertEquals(OlapTableState.ROLLUP, tbl.getState());
+            // cancel rest of rollup jobs
+            stmtStr = "cancel alter table rollup from db1.tbl2 (" + Joiner.on(",").join(jobIds) + ")";
+            CancelAlterTableStmt cancelStmt = (CancelAlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
+            Catalog.getCurrentCatalog().cancelAlter(cancelStmt);
+
+            Assert.assertEquals(OlapTableState.NORMAL, tbl.getState());
+            break;
+        }
+
+        for (Partition partition : tbl.getPartitions()) {
+            Assert.assertEquals(2, partition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org