You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/11 08:38:17 UTC
[incubator-doris] branch master updated: [RoutineLoad] Support
pause or resume all routine load jobs (#6394)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 708b6c5 [RoutineLoad] Support pause or resume all routine load jobs (#6394)
708b6c5 is described below
commit 708b6c529e83e9f0b70bd9d5dd950721cb8a77a2
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Aug 11 16:38:06 2021 +0800
[RoutineLoad] Support pause or resume all routine load jobs (#6394)
1. PAUSE ALL ROUTINE LOAD;
2. RESUME ALL ROUTINE LOAD;
---
.../Data Manipulation/PAUSE ROUTINE LOAD.md | 11 ++-
.../Data Manipulation/RESUME ROUTINE LOAD.md | 11 ++-
.../Data Manipulation/PAUSE ROUTINE LOAD.md | 6 +-
.../Data Manipulation/RESUME ROUTINE LOAD.md | 7 +-
fe/fe-core/src/main/cup/sql_parser.cup | 8 ++
.../doris/analysis/PauseRoutineLoadStmt.java | 25 ++++-
.../doris/analysis/ResumeRoutineLoadStmt.java | 25 ++++-
.../doris/load/routineload/RoutineLoadManager.java | 101 ++++++++++++++++-----
.../load/routineload/RoutineLoadManagerTest.java | 64 +++++++++++++
9 files changed, 221 insertions(+), 37 deletions(-)
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
index 7b98035..92c157a 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
@@ -27,9 +27,14 @@ under the License.
# PAUSE ROUTINE LOAD
## example
-1. Suspend the routine import operation named test 1.
+1. Pause routine load named test1;
-PAUSE ROUTINE LOAD FOR test1;
+ PAUSE ROUTINE LOAD FOR test1;
+
+2. Pause all running routine load;
+
+ PAUSE ALL ROUTINE LOAD;
## keyword
-PAUSE,ROUTINE,LOAD
+
+ PAUSE,ALL,ROUTINE,LOAD
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
index 78d4755..26a499b 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
@@ -27,9 +27,14 @@ under the License.
# RESUME ROUTINE LOAD
## example
-1. Restore the routine import job named test 1.
+1. Resume routine load job named test1.
-RESUME ROUTINE LOAD FOR test1;
+ RESUME ROUTINE LOAD FOR test1;
+
+2. Resume all paused routine load job.
+
+ RESUME ALL ROUTINE LOAD;
## keyword
-RESUME,ROUTINE,LOAD
+
+ RESUME,ALL,ROUTINE,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
index 85f8c86..4a88095 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
@@ -31,6 +31,10 @@ under the License.
PAUSE ROUTINE LOAD FOR test1;
+2. 暂停所有正在运行的例行导入作业
+
+ PAUSE ALL ROUTINE LOAD;
+
## keyword
- PAUSE,ROUTINE,LOAD
+ PAUSE,ALL,ROUTINE,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
index 9a3730b..5afa216 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
@@ -31,6 +31,11 @@ under the License.
RESUME ROUTINE LOAD FOR test1;
+2. 恢复所有暂停中的例行导入作业。
+
+ RESUME ALL ROUTINE LOAD;
+
## keyword
- RESUME,ROUTINE,LOAD
+
+ RESUME,ALL,ROUTINE,LOAD
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 2981e84..d29bee8 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1757,6 +1757,10 @@ pause_routine_load_stmt ::=
{:
RESULT = new PauseRoutineLoadStmt(jobLabel);
:}
+ | KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD
+ {:
+ RESULT = new PauseRoutineLoadStmt(null);
+ :}
;
resume_routine_load_stmt ::=
@@ -1764,6 +1768,10 @@ resume_routine_load_stmt ::=
{:
RESULT = new ResumeRoutineLoadStmt(jobLabel);
:}
+ | KW_RESUME KW_ALL KW_ROUTINE KW_LOAD
+ {:
+ RESULT = new ResumeRoutineLoadStmt(null);
+ :}
;
stop_routine_load_stmt ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
index 19ae0c2..49f07f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
@@ -17,9 +17,13 @@
package org.apache.doris.analysis;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import com.google.common.base.Strings;
+
/*
Pause routine load by name
@@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
public class PauseRoutineLoadStmt extends DdlStmt {
private final LabelName labelName;
+ private String db;
public PauseRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}
+ public boolean isAll() {
+ return labelName == null;
+ }
+
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName(){
- return labelName.getDbName();
+ return db;
}
@Override
- public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+ public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
- labelName.analyze(analyzer);
+ if (labelName != null) {
+ labelName.analyze(analyzer);
+ db = labelName.getDbName();
+ } else {
+ if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
index ca31856..888d032 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
@@ -17,9 +17,13 @@
package org.apache.doris.analysis;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import com.google.common.base.Strings;
+
/*
Resume routine load job by name
@@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
public class ResumeRoutineLoadStmt extends DdlStmt{
private final LabelName labelName;
+ private String db;
public ResumeRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}
+ public boolean isAll() {
+ return labelName == null;
+ }
+
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
- return labelName.getDbName();
+ return db;
}
@Override
- public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+ public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
- labelName.analyze(analyzer);
+ if (labelName != null) {
+ labelName.analyze(analyzer);
+ db = labelName.getDbName();
+ } else {
+ if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 045fbd1..7b7e62f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -232,34 +232,93 @@ public class RoutineLoadManager implements Writable {
return routineLoadJob;
}
+ // get all jobs which state is not in final state from specified database
+ public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
+ throws MetaNotFoundException, DdlException, AnalysisException {
+
+ List<RoutineLoadJob> result = Lists.newArrayList();
+ Database database = Catalog.getCurrentCatalog().getDb(dbName);
+ if (database == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
+ }
+ long dbId = database.getId();
+ Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId);
+ if (jobMap == null) {
+ // return empty result
+ return result;
+ }
+
+ for (List<RoutineLoadJob> jobs : jobMap.values()) {
+ for (RoutineLoadJob job : jobs) {
+ if (!job.getState().isFinalState()) {
+ String tableName = job.getTableName();
+ if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
+ dbName, tableName, PrivPredicate.LOAD)) {
+ continue;
+ }
+ result.add(job);
+ }
+ }
+ }
+
+ return result;
+ }
+
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
- RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
- pauseRoutineLoadStmt.getName());
+ List<RoutineLoadJob> jobs = Lists.newArrayList();
+ if (pauseRoutineLoadStmt.isAll()) {
+ jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
+ } else {
+ RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
+ pauseRoutineLoadStmt.getName());
+ jobs.add(routineLoadJob);
+ }
- routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
- new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
- "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
- false /* not replay */);
- LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
- routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
+ for (RoutineLoadJob routineLoadJob : jobs) {
+ try {
+ routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
+ new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
+ "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
+ false /* not replay */);
+ LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
+ routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
+ } catch (UserException e) {
+ LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
+ continue;
+ }
+ }
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
- RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
- resumeRoutineLoadStmt.getName());
-
- routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
- routineLoadJob.autoResumeCount = 0;
- routineLoadJob.firstResumeTimestamp = 0;
- routineLoadJob.autoResumeLock = false;
- routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
- LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
- .add("current_state", routineLoadJob.getState())
- .add("user", ConnectContext.get().getQualifiedUser())
- .add("msg", "routine load job has been resumed by user")
- .build());
+
+ List<RoutineLoadJob> jobs = Lists.newArrayList();
+ if (resumeRoutineLoadStmt.isAll()) {
+ jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
+ } else {
+ RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
+ resumeRoutineLoadStmt.getName());
+ jobs.add(routineLoadJob);
+ }
+
+ for (RoutineLoadJob routineLoadJob : jobs) {
+ try {
+ routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
+ routineLoadJob.autoResumeCount = 0;
+ routineLoadJob.firstResumeTimestamp = 0;
+ routineLoadJob.autoResumeLock = false;
+ routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
+ LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
+ .add("current_state", routineLoadJob.getState())
+ .add("user", ConnectContext.get().getQualifiedUser())
+ .add("msg", "routine load job has been resumed by user")
+ .build());
+ } catch (UserException e) {
+ LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
+ continue;
+ }
+ }
}
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index fb8fe9b..f66b197 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -913,4 +913,68 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState());
}
+
+ @Test
+ public void testPauseAndResumeAllRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt,
+ @Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt,
+ @Mocked Catalog catalog,
+ @Mocked Database database,
+ @Mocked PaloAuth paloAuth,
+ @Mocked ConnectContext connectContext) throws UserException {
+ RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+ Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap();
+ Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();
+
+ List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList();
+ RoutineLoadJob routineLoadJob1 = new KafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob1, "id", 1000L);
+ routineLoadJobList1.add(routineLoadJob1);
+
+ List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList();
+ RoutineLoadJob routineLoadJob2 = new KafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob2, "id", 1002L);
+ routineLoadJobList2.add(routineLoadJob2);
+
+ nameToRoutineLoadJob.put("job1", routineLoadJobList1);
+ nameToRoutineLoadJob.put("job2", routineLoadJobList2);
+ dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
+ Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
+
+ Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+ Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+
+ new Expectations() {
+ {
+ pauseRoutineLoadStmt.isAll();
+ minTimes = 0;
+ result = true;
+ pauseRoutineLoadStmt.getDbFullName();
+ minTimes = 0;
+ result = "";
+ catalog.getDb("");
+ minTimes = 0;
+ result = database;
+ database.getId();
+ minTimes = 0;
+ result = 1L;
+ catalog.getAuth();
+ minTimes = 0;
+ result = paloAuth;
+ paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
+ minTimes = 0;
+ result = true;
+ resumeRoutineLoadStmt.isAll();
+ minTimes = 0;
+ result = true;
+ }
+ };
+
+ routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
+ Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob1.getState());
+ Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob2.getState());
+
+ routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt);
+ Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+ Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org