You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/24 02:08:43 UTC

[incubator-doris] branch master updated: display current load bytes to show load progress, (#7134)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d420ff0  display current load bytes to show load progress, (#7134)
d420ff0 is described below

commit d420ff0afdc1b85e4856d855e19d39133fc5f094
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Nov 24 10:08:32 2021 +0800

    display current load bytes to show load progress, (#7134)
    
    this value may greate than the file size when loading
    parquert or orc file, will less than file size when loading
    csv file.
---
 be/src/exec/tablet_sink.cpp                        | 12 ++++----
 be/src/runtime/fragment_mgr.cpp                    | 18 ++++++------
 .../apache/doris/load/loadv2/BrokerLoadJob.java    | 10 +++++++
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 33 +++++++++++++++++++---
 .../org/apache/doris/load/loadv2/LoadManager.java  |  4 +--
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  3 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 +--
 .../apache/doris/load/loadv2/dpp/DppResult.java    |  4 +++
 gensrc/thrift/FrontendService.thrift               |  2 ++
 9 files changed, 68 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 3e2db46..7ee4e86 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -707,13 +707,15 @@ Status OlapTableSink::open(RuntimeState* state) {
 
 Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
     SCOPED_TIMER(_profile->total_time_counter());
-    _number_input_rows += input_batch->num_rows();
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
-    state->update_num_rows_load_total(input_batch->num_rows());
-    state->update_num_bytes_load_total(input_batch->total_byte_size());
-    DorisMetrics::instance()->load_rows->increment(input_batch->num_rows());
-    DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size());
+    int64_t num_rows = input_batch->num_rows();
+    int64_t num_bytes = input_batch->total_byte_size();
+    _number_input_rows += num_rows;
+    state->update_num_rows_load_total(num_rows);
+    state->update_num_bytes_load_total(num_bytes);
+    DorisMetrics::instance()->load_rows->increment(num_rows);
+    DorisMetrics::instance()->load_bytes->increment(num_bytes);
     RowBatch* batch = input_batch;
     if (!_output_expr_ctxs.empty()) {
         SCOPED_RAW_TIMER(&_convert_batch_ns);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9719e6c..dab2046 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -307,9 +307,11 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
     if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) {
         // this is a load plan, and load is not finished, just make a brief report
         params.__set_loaded_rows(runtime_state->num_rows_load_total());
+        params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
     } else {
         if (runtime_state->query_options().query_type == TQueryType::LOAD) {
             params.__set_loaded_rows(runtime_state->num_rows_load_total());
+            params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
         }
         if (profile == nullptr) {
             params.__isset.profile = false;
@@ -373,11 +375,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
         try {
             coord->reportExecStatus(res, params);
         } catch (TTransportException& e) {
-            LOG(WARNING) << "Retrying ReportExecStatus. query id: "
-                         << print_id(_query_id) << ", instance id: "
-                         << print_id(_fragment_instance_id)
-                         << " to " << _coord_addr
-                         << ", err: " << e.what();
+            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id)
+                         << ", instance id: " << print_id(_fragment_instance_id) << " to "
+                         << _coord_addr << ", err: " << e.what();
             rpc_status = coord.reopen();
 
             if (!rpc_status.ok()) {
@@ -452,9 +452,11 @@ FragmentMgr::~FragmentMgr() {
 static void empty_function(PlanFragmentExecutor* exec) {}
 
 void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) {
-    TAG(LOG(INFO)).log("PlanFragmentExecutor::_exec_actual")
-            .query_id(exec_state->query_id()).instance_id(exec_state->fragment_instance_id())
-            .tag("pthread_id", std::to_string((uintptr_t) pthread_self()));
+    TAG(LOG(INFO))
+            .log("PlanFragmentExecutor::_exec_actual")
+            .query_id(exec_state->query_id())
+            .instance_id(exec_state->fragment_instance_id())
+            .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
     exec_state->execute();
 
     std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index b57a2de..96cf48d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -346,6 +346,16 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
     }
 
+    @Override
+    public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+                               long scannedBytes, boolean isDone) {
+        super.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+        progress = (int) ((double) loadStatistic.getLoadBytes() / loadStatistic.totalFileSizeB * 100);
+        if (progress >= 100) {
+            progress = 99;
+        }
+    }
+
     private String increaseCounter(String key, String deltaValue) {
         long value = 0;
         if (loadingStatus.getCounters().containsKey(key)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c5c97b3..1d86239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -136,6 +136,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         // load task id -> fragment id -> rows count
         private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
 
+        // load task id -> fragment id -> load bytes
+        private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create();
+
         // load task id -> unfinished backend id list
         private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
         // load task id -> all backend id list
@@ -151,6 +154,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             for (TUniqueId fragId : fragmentIds) {
                 counterTbl.put(loadId, fragId, 0L);
             }
+            loadBytes.rowMap().remove(loadId);
+            for (TUniqueId fragId : fragmentIds) {
+                loadBytes.put(loadId, fragId, 0L);
+            }
             allBackendIds.put(loadId, relatedBackendIds);
             // need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
             // allBackendIds, the list in unfinishedBackendIds will not be changed.
@@ -159,15 +166,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
 
         public synchronized void removeLoad(TUniqueId loadId) {
             counterTbl.rowMap().remove(loadId);
+            loadBytes.rowMap().remove(loadId);
             unfinishedBackendIds.remove(loadId);
             allBackendIds.remove(loadId);
         }
 
         public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
-                                                    long rows, boolean isDone) {
+                                                    long rows, long bytes, boolean isDone) {
             if (counterTbl.contains(loadId, fragmentId)) {
                 counterTbl.put(loadId, fragmentId, rows);
             }
+
+            if (loadBytes.contains(loadId, fragmentId)) {
+                loadBytes.put(loadId, fragmentId, bytes);
+            }
             if (isDone && unfinishedBackendIds.containsKey(loadId)) {
                 unfinishedBackendIds.get(loadId).remove(backendId);
             }
@@ -181,18 +193,30 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             return total;
         }
 
+        public synchronized long getLoadBytes() {
+            long total = 0;
+            for (long bytes : loadBytes.values()) {
+                total += bytes;
+            }
+            return total;
+        }
+
         public synchronized String toJson() {
             long total = 0;
             for (long rows : counterTbl.values()) {
                 total += rows;
             }
+            long totalBytes = 0;
+            for (long bytes : loadBytes.values()) {
+                totalBytes += bytes;
+            }
 
             Map<String, Object> details = Maps.newHashMap();
             details.put("ScannedRows", total);
+            details.put("LoadBytes", totalBytes);
             details.put("FileNumber", fileNum);
             details.put("FileSize", totalFileSizeB);
             details.put("TaskNumber", counterTbl.rowMap().size());
-            details.put("TaskNumber", counterTbl.rowMap().size());
             details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
             details.put("All backends", getPrintableMap(allBackendIds));
             Gson gson = new Gson();
@@ -284,8 +308,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         loadStatistic.initLoad(loadId, fragmentIds, relatedBackendIds);
     }
 
-    public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, boolean isDone) {
-        loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, isDone);
+    public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+                               long scannedBytes, boolean isDone) {
+        loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
     }
 
     public void setLoadFileInfo(int fileNum, long fileSize) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0c71559..42aea02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -675,10 +675,10 @@ public class LoadManager implements Writable{
     }
 
     public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
-                                  long scannedRows, boolean isDone) {
+                                  long scannedRows, long scannedBytes, boolean isDone) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
-            job.updateProgress(beId, loadId, fragmentId, scannedRows, isDone);
+            job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index cb38cbf..9185437 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -331,7 +331,8 @@ public class SparkLoadJob extends BulkLoadJob {
             TUniqueId dummyId = new TUniqueId(0, 0);
             long dummyBackendId = -1L;
             loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId));
-            loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows, true);
+            loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows,
+                    dppResult.scannedBytes, true);
 
             Map<String, String> counters = loadingStatus.getCounters();
             counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ad0a9d9..f1adb35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1538,8 +1538,8 @@ public class Coordinator {
 
         if (params.isSetLoadedRows()) {
             Catalog.getCurrentCatalog().getLoadManager().updateJobProgress(
-                    jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows,
-                    params.done);
+                    jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
+                    params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
         }
     }
 
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
index fa813ca..037cc9b 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
@@ -32,6 +32,7 @@ public class DppResult implements Serializable {
         abnormalRows = 0;
         unselectRows = 0;
         partialAbnormalRows = "";
+        scannedBytes = 0;
     }
 
     @SerializedName("is_success")
@@ -61,4 +62,7 @@ public class DppResult implements Serializable {
     // only part of abnormal rows will be returned
     @SerializedName("partial_abnormal_rows")
     public String partialAbnormalRows;
+
+    @SerializedName("scanned_bytes")
+    public long scannedBytes;
 }
\ No newline at end of file
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 374e282..013ac0d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -398,6 +398,8 @@ struct TReportExecStatusParams {
   15: optional i64 loaded_rows
 
   16: optional i64 backend_id
+
+  17: optional i64 loaded_bytes
 }
 
 struct TFeResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org