You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/26 13:30:32 UTC

[incubator-doris] branch master updated: [Load] Add more info in SHOW LOAD result (#3391)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a934ec  [Load] Add more info in SHOW LOAD result (#3391)
9a934ec is described below

commit 9a934ec9f6c3eedf184303823cde43ae2975912b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Apr 26 21:30:23 2020 +0800

    [Load] Add more info in SHOW LOAD result (#3391)
    
    Fix #3390
    This CL add more info in `JobDetails` column of `SHOW LOAD` result for Broker Load Job.
    
    For example:
    ```
    {
    	"Unfinished backends": {
    		"9c3441027ff948a0-8287923329a2b6a7": [10002]
    	},
            "All backends": {
    		"9c3441027ff948a0-8287923329a2b6a7": [10002, 10004, 10006]
    	},
    	"ScannedRows": 2390016,
    	"TaskNumber": 1,
    	"FileNumber": 1,
    	"FileSize": 1073741824
    }
    ```
    
    2 newly added keys:
    
    `Unfinished backends` indicates the BE which task on them are not finished.
    `All backends` indicates the BE which this job has tasks on it.
    
    One more thing, I pass the Backend Id along with the heartbeat msg from FE to BE, so that BE can
    know the Id of themselves.
---
 be/src/agent/heartbeat_server.cpp                  |  4 ++
 be/src/runtime/fragment_mgr.cpp                    |  5 +++
 .../load-data/broker-load-manual.md                |  8 ++--
 .../load-data/broker-load-manual_EN.md             | 14 ++++++-
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 43 +++++++++++++++++-----
 .../org/apache/doris/load/loadv2/LoadManager.java  | 10 +++--
 .../main/java/org/apache/doris/qe/Coordinator.java | 13 ++++---
 .../java/org/apache/doris/system/HeartbeatMgr.java |  1 +
 gensrc/thrift/FrontendService.thrift               |  2 +
 gensrc/thrift/HeartbeatService.thrift              |  1 +
 10 files changed, 77 insertions(+), 24 deletions(-)

diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp
index ed72137..08915b5 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -153,6 +153,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
         heartbeat_flags->update(master_info.heartbeat_flags);
     }
 
+    if (master_info.__isset.backend_id) {
+        _master_info->__set_backend_id(master_info.backend_id);
+    }
+
     if (need_report) {
         LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
         _olap_engine->trigger_report();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 593a6bb..c709d06 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -37,6 +37,7 @@
 #include "util/url_coding.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
+#include "gen_cpp/HeartbeatService.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -321,6 +322,10 @@ void FragmentExecState::coordinator_callback(
         params.__isset.error_log = (params.error_log.size() > 0);
     }
 
+    if (_exec_env->master_info()->__isset.backend_id) {
+        params.__set_backend_id(_exec_env->master_info()->backend_id);
+    }
+
     TReportExecStatusResult res;
     Status rpc_status;
 
diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
index da203cf..1586747 100644
--- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
@@ -278,7 +278,7 @@ mysql> show load order by createtime desc limit 1\G
  LoadStartTime: 2019-07-27 11:46:44
 LoadFinishTime: 2019-07-27 11:50:16
            URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
-    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
+    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
 ```
 
 下面主要介绍了查看导入命令返回结果集中参数意义:
@@ -357,9 +357,11 @@ LoadFinishTime: 2019-07-27 11:50:16
 
 + JobDetails
 
-    显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数等。
+    显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的 BE 节点 Id,未完成的 BE 节点 Id。
 
-    ```{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}```
+    ```
+    {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
+    ```
 
     其中已处理的原始行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。
 
diff --git a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
index 94e4fe8..311fb45 100644
--- a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
+++ b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
@@ -270,7 +270,7 @@ mysql> show load order by createtime desc limit 1\G
  LoadStartTime: 2019-07-27 11:46:44
 LoadFinishTime: 2019-07-27 11:50:16
            URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
-    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
+    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
 ```
 
 The following is mainly about the significance of viewing the parameters in the return result set of the import command:
@@ -344,6 +344,16 @@ The following is mainly about the significance of viewing the parameters in the
 
 	The error data sample of the import task can be obtained by accessing the URL address. When there is no error data in this import, the URL field is N/A.
 
++ JobDetails
+
+    Display some details of the running status of the job. Including file number, total file size(Bytes), num of sub tasks, scanned rows, related backend ids and unfinished backend ids.
+
+    ```
+    {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
+    ```
+
+    This info will be updated every 5 seconds. the ScannedRows only for displaying the job progress, not indicate the real numbers.
+
 ### Cancel load
 
 When the Broker load job status is not CANCELLED or FINISHED, it can be manually cancelled by the user. When canceling, you need to specify a Label for the import task to be cancelled. Canceling Import command syntax can perform `HELP CANCEL LOAD` view.
@@ -485,4 +495,4 @@ Cluster situation: The number of BEs in the cluster is about 3, and the Broker n
      `` `
      Represents getting the column with (tmp_c1, tmp_c2) as the column name in parquet or orc, which is mapped to the (id, name) column in the doris table. If set is not set, the column names in the column are used as the mapping relationship.
 
-     Note: If the orc file directly generated by some hive versions is used, the table header in the orc file is not the column name in the hive meta, but (_col0, _col1, _col2, ...), which may cause the Invalid Column Name error, then You need to use set for mapping.
\ No newline at end of file
+     Note: If the orc file directly generated by some hive versions is used, the table header in the orc file is not the column name in the hive meta, but (_col0, _col1, _col2, ...), which may cause the Invalid Column Name error, then You need to use set for mapping.
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 08efaeb..c5b7fba 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -33,6 +33,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.TimeUtils;
@@ -138,30 +139,41 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         // load task id -> fragment id -> rows count
         private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
 
+        // load task id -> unfinished backend id list
+        private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
+        // load task id -> all backend id list
+        private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
+
         // number of file to be loaded
         public int fileNum = 0;
         public long totalFileSizeB = 0;
 
         // init the statistic of specified load task
-        public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds) {
+        public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
             counterTbl.rowMap().remove(loadId);
             for (TUniqueId fragId : fragmentIds) {
                 counterTbl.put(loadId, fragId, 0L);
             }
+            allBackendIds.put(loadId, relatedBackendIds);
+            // need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
+            // allBackendIds, the list in unfinishedBackendIds will not be changed.
+            unfinishedBackendIds.put(loadId, Lists.newArrayList(relatedBackendIds));
         }
 
         public synchronized void removeLoad(TUniqueId loadId) {
             counterTbl.rowMap().remove(loadId);
+            unfinishedBackendIds.remove(loadId);
+            allBackendIds.remove(loadId);
         }
 
-        public synchronized void updateLoad(TUniqueId loadId, TUniqueId fragmentId, long rows) {
+        public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
+                long rows, boolean isDone) {
             if (counterTbl.contains(loadId, fragmentId)) {
                 counterTbl.put(loadId, fragmentId, rows);
             }
-        }
-
-        public synchronized void clearAllLoads() {
-            counterTbl.clear();
+            if (isDone && unfinishedBackendIds.containsKey(loadId)) {
+                unfinishedBackendIds.get(loadId).remove(backendId);
+            }
         }
 
         public synchronized String toJson() {
@@ -175,9 +187,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             details.put("FileNumber", fileNum);
             details.put("FileSize", totalFileSizeB);
             details.put("TaskNumber", counterTbl.rowMap().size());
+            details.put("TaskNumber", counterTbl.rowMap().size());
+            details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
+            details.put("All backends", getPrintableMap(allBackendIds));
             Gson gson = new Gson();
             return gson.toJson(details);
         }
+
+        private Map<String, List<Long>> getPrintableMap(Map<TUniqueId, List<Long>> map) {
+            Map<String, List<Long>> newMap = Maps.newHashMap();
+            for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
+                newMap.put(DebugUtil.printId(entry.getKey()), entry.getValue());
+            }
+            return newMap;
+        }
     }
 
     // only for log replay
@@ -254,12 +277,12 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         return transactionId;
     }
 
-    public void initScannedRows(TUniqueId loadId, Set<TUniqueId> fragmentIds) {
-        loadStatistic.initLoad(loadId, fragmentIds);
+    public void initLoadProgress(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
+        loadStatistic.initLoad(loadId, fragmentIds, relatedBackendIds);
     }
 
-    public void updateScannedRows(TUniqueId loadId, TUniqueId fragmentId, long scannedRows) {
-        loadStatistic.updateLoad(loadId, fragmentId, scannedRows);
+    public void updateProgess(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, boolean isDone) {
+        loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, isDone);
     }
 
     public void setLoadFileInfo(int fileNum, long fileSize) {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 5104094..904bd94 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -582,17 +582,19 @@ public class LoadManager implements Writable{
         return false;
     }
 
-    public void initJobScannedRows(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds) {
+    public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
+            List<Long> relatedBackendIds) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
-            job.initScannedRows(loadId, fragmentIds);
+            job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
         }
     }
 
-    public void updateJobScannedRows(Long jobId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows) {
+    public void updateJobPrgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
+            long scannedRows, boolean isDone) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
-            job.updateScannedRows(loadId, fragmentId, scannedRows);
+            job.updateProgess(beId, loadId, fragmentId, scannedRows, isDone);
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index c409e97..01465c2 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -123,6 +123,7 @@ public class Coordinator {
     // status or to CANCELLED, if Cancel() is called.
     Status queryStatus = new Status();
 
+    // save of related backends of this query
     Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
 
     private ImmutableMap<Long, Backend> idToBackend = ImmutableMap.of();
@@ -410,7 +411,9 @@ public class Coordinator {
             this.queryOptions.setIs_report_success(true);
             deltaUrls = Lists.newArrayList();
             loadCounters = Maps.newHashMap();
-            Catalog.getCurrentCatalog().getLoadManager().initJobScannedRows(jobId, queryId, instanceIds);
+            List<Long> relatedBackendIds = Lists.newArrayList(addressToBackendID.values());
+            Catalog.getCurrentCatalog().getLoadManager().initJobProgress(jobId, queryId, instanceIds,
+                    relatedBackendIds);
         }
 
         // to keep things simple, make async Cancel() calls wait until plan fragment
@@ -470,7 +473,6 @@ public class Coordinator {
 
                     backendId++;
                 }
-
                 for (Pair<BackendExecState, Future<PExecPlanFragmentResult>> pair : futures) {
                     TStatusCode code = TStatusCode.INTERNAL_ERROR;
                     String errMsg = null;
@@ -1094,7 +1096,7 @@ public class Coordinator {
         }
     }
 
-    //To ensure the same bucketSeq tablet to the same execHostPort
+    // To ensure the same bucketSeq tablet to the same execHostPort
     private void computeScanRangeAssignmentByColocate(
             final OlapScanNode scanNode,
             FragmentScanRangeAssignment assignment) throws Exception {
@@ -1226,8 +1228,9 @@ public class Coordinator {
         }
 
         if (params.isSetLoaded_rows()) {
-            Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows(
-                    jobId, params.query_id, params.fragment_instance_id, params.loaded_rows);
+            Catalog.getCurrentCatalog().getLoadManager().updateJobPrgress(
+                    jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows,
+                    params.done);
         }
 
         return;
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 8e383da..138ff81 100644
--- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -226,6 +226,7 @@ public class HeartbeatMgr extends MasterDaemon {
                 copiedMasterInfo.setBackend_ip(backend.getHost());
                 long flags = heartbeatFlags.getHeartbeatFlags();
                 copiedMasterInfo.setHeartbeat_flags(flags);
+                copiedMasterInfo.setBackend_id(backendId);
                 THeartbeatResult result = client.heartbeat(copiedMasterInfo);
 
                 ok = true;
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index b7a8c07..37ef324 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -374,6 +374,8 @@ struct TReportExecStatusParams {
   14: optional list<Types.TTabletCommitInfo> commitInfos
 
   15: optional i64 loaded_rows
+
+  16: optional i64 backend_id
 }
 
 struct TFeResult {
diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift
index 78db2a0..339d8a0 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -28,6 +28,7 @@ struct TMasterInfo {
     5: optional string backend_ip
     6: optional Types.TPort http_port
     7: optional i64 heartbeat_flags
+    8: optional i64 backend_id
 }
 
 struct TBackendInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org