You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/11 08:38:17 UTC

[incubator-doris] branch master updated: [RoutineLoad] Support pause or resume all routine load jobs (#6394)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 708b6c5  [RoutineLoad] Support pause or resume all routine load jobs (#6394)
708b6c5 is described below

commit 708b6c529e83e9f0b70bd9d5dd950721cb8a77a2
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Aug 11 16:38:06 2021 +0800

    [RoutineLoad] Support pause or resume all routine load jobs (#6394)
    
    1. PAUSE ALL ROUTINE LOAD;
    2. RESUME ALL ROUTINE LOAD;
---
 .../Data Manipulation/PAUSE ROUTINE LOAD.md        |  11 ++-
 .../Data Manipulation/RESUME ROUTINE LOAD.md       |  11 ++-
 .../Data Manipulation/PAUSE ROUTINE LOAD.md        |   6 +-
 .../Data Manipulation/RESUME ROUTINE LOAD.md       |   7 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |   8 ++
 .../doris/analysis/PauseRoutineLoadStmt.java       |  25 ++++-
 .../doris/analysis/ResumeRoutineLoadStmt.java      |  25 ++++-
 .../doris/load/routineload/RoutineLoadManager.java | 101 ++++++++++++++++-----
 .../load/routineload/RoutineLoadManagerTest.java   |  64 +++++++++++++
 9 files changed, 221 insertions(+), 37 deletions(-)

diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
index 7b98035..92c157a 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md	
@@ -27,9 +27,14 @@ under the License.
 # PAUSE ROUTINE LOAD
 ## example
 
-1. Suspend the routine import operation named test 1.
+1. Pause routine load named test1;
 
-PAUSE ROUTINE LOAD FOR test1;
+	PAUSE ROUTINE LOAD FOR test1;
+
+2. Pause all running routine load;
+
+    PAUSE ALL ROUTINE LOAD;
 
 ## keyword
-PAUSE,ROUTINE,LOAD
+
+	PAUSE,ALL,ROUTINE,LOAD
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
index 78d4755..26a499b 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md	
@@ -27,9 +27,14 @@ under the License.
 # RESUME ROUTINE LOAD
 ## example
 
-1. Restore the routine import job named test 1.
+1. Resume routine load job named test1.
 
-RESUME ROUTINE LOAD FOR test1;
+	RESUME ROUTINE LOAD FOR test1;
+
+2. Resume all paused routine load job.
+
+    RESUME ALL ROUTINE LOAD;
 
 ## keyword
-RESUME,ROUTINE,LOAD
+
+	RESUME,ALL,ROUTINE,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md
index 85f8c86..4a88095 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md	
@@ -31,6 +31,10 @@ under the License.
 
     PAUSE ROUTINE LOAD FOR test1;
 
+2. 暂停所有正在运行的例行导入作业
+
+    PAUSE ALL ROUTINE LOAD;
+
 ## keyword
-    PAUSE,ROUTINE,LOAD
+    PAUSE,ALL,ROUTINE,LOAD
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md
index 9a3730b..5afa216 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md	
@@ -31,6 +31,11 @@ under the License.
 
     RESUME ROUTINE LOAD FOR test1;
 
+2. 恢复所有暂停中的例行导入作业。
+
+    RESUME ALL ROUTINE LOAD;
+
 ## keyword
-    RESUME,ROUTINE,LOAD
+
+    RESUME,ALL,ROUTINE,LOAD
 
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 2981e84..d29bee8 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1757,6 +1757,10 @@ pause_routine_load_stmt ::=
     {:
         RESULT = new PauseRoutineLoadStmt(jobLabel);
     :}
+    | KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD
+    {:
+        RESULT = new PauseRoutineLoadStmt(null);
+    :}
     ;
 
 resume_routine_load_stmt ::=
@@ -1764,6 +1768,10 @@ resume_routine_load_stmt ::=
     {:
         RESULT = new ResumeRoutineLoadStmt(jobLabel);
     :}
+    | KW_RESUME KW_ALL KW_ROUTINE KW_LOAD
+    {:
+        RESULT = new ResumeRoutineLoadStmt(null);
+    :}
     ;
 
 stop_routine_load_stmt ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
index 19ae0c2..49f07f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
@@ -17,9 +17,13 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 
+import com.google.common.base.Strings;
+
 /*
   Pause routine load by name
 
@@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
 public class PauseRoutineLoadStmt extends DdlStmt {
 
     private final LabelName labelName;
+    private String db;
 
     public PauseRoutineLoadStmt(LabelName labelName) {
         this.labelName = labelName;
     }
 
+    public boolean isAll() {
+        return labelName == null;
+    }
+
     public String getName() {
         return labelName.getLabelName();
     }
 
     public String getDbFullName(){
-        return labelName.getDbName();
+        return db;
     }
 
     @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+    public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
-        labelName.analyze(analyzer);
+        if (labelName != null) {
+            labelName.analyze(analyzer);
+            db = labelName.getDbName();
+        } else {
+            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+            db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
index ca31856..888d032 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java
@@ -17,9 +17,13 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 
+import com.google.common.base.Strings;
+
 /*
   Resume routine load job by name
 
@@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
 public class ResumeRoutineLoadStmt extends DdlStmt{
 
     private final LabelName labelName;
+    private String db;
 
     public ResumeRoutineLoadStmt(LabelName labelName) {
         this.labelName = labelName;
     }
 
+    public boolean isAll() {
+        return labelName == null;
+    }
+
     public String getName() {
         return labelName.getLabelName();
     }
 
     public String getDbFullName() {
-        return labelName.getDbName();
+        return db;
     }
 
     @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+    public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
-        labelName.analyze(analyzer);
+        if (labelName != null) {
+            labelName.analyze(analyzer);
+            db = labelName.getDbName();
+        } else {
+            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+            db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 045fbd1..7b7e62f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -232,34 +232,93 @@ public class RoutineLoadManager implements Writable {
         return routineLoadJob;
     }
 
+    // get all jobs which state is not in final state from specified database
+    public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
+            throws MetaNotFoundException, DdlException, AnalysisException {
+
+        List<RoutineLoadJob> result = Lists.newArrayList();
+        Database database = Catalog.getCurrentCatalog().getDb(dbName);
+        if (database == null) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
+        }
+        long dbId = database.getId();
+        Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId);
+        if (jobMap == null) {
+            // return empty result
+            return result;
+        }
+
+        for (List<RoutineLoadJob> jobs : jobMap.values()) {
+            for (RoutineLoadJob job : jobs) {
+                if (!job.getState().isFinalState()) {
+                    String tableName = job.getTableName();
+                    if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
+                            dbName, tableName, PrivPredicate.LOAD)) {
+                        continue;
+                    }
+                    result.add(job);
+                }
+            }
+        }
+
+        return result;
+    }
+
     public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
             throws UserException {
-        RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
-                pauseRoutineLoadStmt.getName());
+        List<RoutineLoadJob> jobs = Lists.newArrayList();
+        if (pauseRoutineLoadStmt.isAll()) {
+            jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
+        } else {
+            RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
+                    pauseRoutineLoadStmt.getName());
+            jobs.add(routineLoadJob);
+        }
 
-        routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
-                new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
-                        "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
-                false /* not replay */);
-        LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
-                routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
+        for (RoutineLoadJob routineLoadJob : jobs) {
+            try {
+                routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
+                        new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
+                                "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
+                        false /* not replay */);
+                LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
+                        routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
                         "routine load job has been paused by user").build());
+            } catch (UserException e) {
+                LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
+                continue;
+            }
+        }
     }
 
     public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
-        RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
-                resumeRoutineLoadStmt.getName());
-
-        routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
-        routineLoadJob.autoResumeCount = 0;
-        routineLoadJob.firstResumeTimestamp = 0;
-        routineLoadJob.autoResumeLock = false;
-        routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
-        LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
-                         .add("current_state", routineLoadJob.getState())
-                         .add("user", ConnectContext.get().getQualifiedUser())
-                         .add("msg", "routine load job has been resumed by user")
-                         .build());
+
+        List<RoutineLoadJob> jobs = Lists.newArrayList();
+        if (resumeRoutineLoadStmt.isAll()) {
+            jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
+        } else {
+            RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
+                    resumeRoutineLoadStmt.getName());
+            jobs.add(routineLoadJob);
+        }
+
+        for (RoutineLoadJob routineLoadJob : jobs) {
+            try {
+                routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
+                routineLoadJob.autoResumeCount = 0;
+                routineLoadJob.firstResumeTimestamp = 0;
+                routineLoadJob.autoResumeLock = false;
+                routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
+                LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
+                        .add("current_state", routineLoadJob.getState())
+                        .add("user", ConnectContext.get().getQualifiedUser())
+                        .add("msg", "routine load job has been resumed by user")
+                        .build());
+            } catch (UserException e) {
+                LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
+                continue;
+            }
+        }
     }
 
     public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index fb8fe9b..f66b197 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -913,4 +913,68 @@ public class RoutineLoadManagerTest {
 
         Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState());
     }
+
+    @Test
+    public void testPauseAndResumeAllRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt,
+                                                    @Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt,
+                                                    @Mocked Catalog catalog,
+                                                    @Mocked Database database,
+                                                    @Mocked PaloAuth paloAuth,
+                                                    @Mocked ConnectContext connectContext) throws UserException {
+        RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+        Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap();
+        Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();
+
+        List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList();
+        RoutineLoadJob routineLoadJob1 = new KafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob1, "id", 1000L);
+        routineLoadJobList1.add(routineLoadJob1);
+
+        List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList();
+        RoutineLoadJob routineLoadJob2 = new KafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob2, "id", 1002L);
+        routineLoadJobList2.add(routineLoadJob2);
+
+        nameToRoutineLoadJob.put("job1", routineLoadJobList1);
+        nameToRoutineLoadJob.put("job2", routineLoadJobList2);
+        dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
+        Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
+
+        Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+        Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+
+        new Expectations() {
+            {
+                pauseRoutineLoadStmt.isAll();
+                minTimes = 0;
+                result = true;
+                pauseRoutineLoadStmt.getDbFullName();
+                minTimes = 0;
+                result = "";
+                catalog.getDb("");
+                minTimes = 0;
+                result = database;
+                database.getId();
+                minTimes = 0;
+                result = 1L;
+                catalog.getAuth();
+                minTimes = 0;
+                result = paloAuth;
+                paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
+                minTimes = 0;
+                result = true;
+                resumeRoutineLoadStmt.isAll();
+                minTimes = 0;
+                result = true;
+            }
+        };
+
+        routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
+        Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob1.getState());
+        Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob2.getState());
+
+        routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt);
+        Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
+        Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState());
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org