You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/02/19 11:45:42 UTC
[incubator-doris] branch master updated: [Alter] Change table's
state right after all rollup jobs being cancelled
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a015cd0 [Alter] Change table's state right after all rollup jobs being cancelled
a015cd0 is described below
commit a015cd0c8bae9a561855a53d4f6df99be0d8ce37
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Feb 19 19:45:35 2020 +0800
[Alter] Change table's state right after all rollup jobs being cancelled
---
.../doris/alter/MaterializedViewHandler.java | 26 +++++---
.../org/apache/doris/alter/BatchRollupJobTest.java | 70 ++++++++++++++++++++--
2 files changed, 84 insertions(+), 12 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index e4eaabb..6f8f482 100644
--- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -740,13 +740,13 @@ public class MaterializedViewHandler extends AlterHandler {
return new HashMap<>(alterJobsV2);
}
- private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) {
+ private void removeJobFromRunningQueue(AlterJobV2 alterJob) {
synchronized (tableRunningJobMap) {
- Set<Long> runningJobIdSet = tableRunningJobMap.get(rollupJobV2.getTableId());
+ Set<Long> runningJobIdSet = tableRunningJobMap.get(alterJob.getTableId());
if (runningJobIdSet != null) {
- runningJobIdSet.remove(rollupJobV2.getJobId());
+ runningJobIdSet.remove(alterJob.getJobId());
if (runningJobIdSet.size() == 0) {
- tableRunningJobMap.remove(rollupJobV2.getTableId());
+ tableRunningJobMap.remove(alterJob.getTableId());
}
}
}
@@ -838,16 +838,23 @@ public class MaterializedViewHandler extends AlterHandler {
// ATTN(cmy): there is still a short gap between "job finish" and "table become normal",
// so if user send next alter job right after the "job finish",
// it may encounter "table's state not NORMAL" error.
+
if (alterJob.isDone()) {
- removeJobFromRunningQueue(alterJob);
- if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
- changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
- }
+ onJobDone(alterJob);
continue;
}
}
}
+ // remove job from running queue and state map, also set table's state to NORMAL if this is
+ // the last running job of the table.
+ private void onJobDone(AlterJobV2 alterJob) {
+ removeJobFromRunningQueue(alterJob);
+ if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
+ changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
+ }
+ }
+
@Deprecated
private void runOldAlterJob() {
List<AlterJob> cancelledJobs = Lists.newArrayList();
@@ -1105,6 +1112,9 @@ public class MaterializedViewHandler extends AlterHandler {
if (rollupJobV2List.size() != 0) {
for (AlterJobV2 alterJobV2 : rollupJobV2List) {
alterJobV2.cancel("user cancelled");
+ if (alterJobV2.isDone()) {
+ onJobDone(alterJobV2);
+ }
}
return;
}
diff --git a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
index a24e307..594860b 100644
--- a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
+++ b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.alter;
import org.apache.doris.analysis.AlterTableStmt;
+import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
@@ -29,30 +30,43 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.UtFrameUtils;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
public class BatchRollupJobTest {
private static String runningDir = "fe/mocked/BatchRollupJobTest/" + UUID.randomUUID().toString() + "/";
+ private static ConnectContext ctx;
@BeforeClass
public static void setup() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
+ ctx = UtFrameUtils.createDefaultCtx();
}
- @Test
- public void test() throws Exception {
- ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+ @Before
+ public void before() throws Exception {
+ Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+ alterJobs.clear();
+
// create database db1
- String createDbStmtStr = "create database db1;";
+ String createDbStmtStr = "create database if not exists db1;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
Catalog.getCurrentCatalog().createDb(createDbStmt);
System.out.println(Catalog.getCurrentCatalog().getDbNames());
+ }
+
+ @Test
+ public void testBatchRollup() throws Exception {
// create table tbl1
String createTblStmtStr1 = "create table db1.tbl1(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx);
@@ -96,4 +110,52 @@ public class BatchRollupJobTest {
Assert.assertEquals(4, partition.getMaterializedIndices(IndexExtState.VISIBLE).size());
}
}
+
+ @Test
+ public void testCancelBatchRollup() throws Exception {
+ // create table tbl2
+ String createTblStmtStr1 = "create table db1.tbl2(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
+ CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx);
+ Catalog.getCurrentCatalog().createTable(createTableStmt);
+
+ // batch add 3 rollups
+ String stmtStr = "alter table db1.tbl2 add rollup r1(k1) duplicate key(k1), r2(k1, k2) duplicate key(k1), r3(k2) duplicate key(k2);";
+ AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
+ Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+
+ Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+ Assert.assertEquals(3, alterJobs.size());
+ List<Long> jobIds = Lists.newArrayList(alterJobs.keySet());
+
+ Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1");
+ Assert.assertNotNull(db);
+ OlapTable tbl = (OlapTable) db.getTable("tbl2");
+ Assert.assertNotNull(tbl);
+
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ if (alterJobV2.getType() != AlterJobV2.JobType.ROLLUP) {
+ continue;
+ }
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println(
+ "rollup job " + alterJobV2.getJobId() + " is running. state: " + alterJobV2.getJobState());
+ Thread.sleep(5000);
+ }
+ System.out.println("rollup job " + alterJobV2.getJobId() + " is done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+
+ Assert.assertEquals(OlapTableState.ROLLUP, tbl.getState());
+ // cancel rest of rollup jobs
+ stmtStr = "cancel alter table rollup from db1.tbl2 (" + Joiner.on(",").join(jobIds) + ")";
+ CancelAlterTableStmt cancelStmt = (CancelAlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
+ Catalog.getCurrentCatalog().cancelAlter(cancelStmt);
+
+ Assert.assertEquals(OlapTableState.NORMAL, tbl.getState());
+ break;
+ }
+
+ for (Partition partition : tbl.getPartitions()) {
+ Assert.assertEquals(2, partition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org