You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/24 02:08:43 UTC
[incubator-doris] branch master updated: display current load bytes to show load progress, (#7134)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d420ff0 display current load bytes to show load progress, (#7134)
d420ff0 is described below
commit d420ff0afdc1b85e4856d855e19d39133fc5f094
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Nov 24 10:08:32 2021 +0800
display current load bytes to show load progress, (#7134)
this value may greate than the file size when loading
parquert or orc file, will less than file size when loading
csv file.
---
be/src/exec/tablet_sink.cpp | 12 ++++----
be/src/runtime/fragment_mgr.cpp | 18 ++++++------
.../apache/doris/load/loadv2/BrokerLoadJob.java | 10 +++++++
.../java/org/apache/doris/load/loadv2/LoadJob.java | 33 +++++++++++++++++++---
.../org/apache/doris/load/loadv2/LoadManager.java | 4 +--
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 3 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +--
.../apache/doris/load/loadv2/dpp/DppResult.java | 4 +++
gensrc/thrift/FrontendService.thrift | 2 ++
9 files changed, 68 insertions(+), 22 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 3e2db46..7ee4e86 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -707,13 +707,15 @@ Status OlapTableSink::open(RuntimeState* state) {
Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
SCOPED_TIMER(_profile->total_time_counter());
- _number_input_rows += input_batch->num_rows();
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
- state->update_num_rows_load_total(input_batch->num_rows());
- state->update_num_bytes_load_total(input_batch->total_byte_size());
- DorisMetrics::instance()->load_rows->increment(input_batch->num_rows());
- DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size());
+ int64_t num_rows = input_batch->num_rows();
+ int64_t num_bytes = input_batch->total_byte_size();
+ _number_input_rows += num_rows;
+ state->update_num_rows_load_total(num_rows);
+ state->update_num_bytes_load_total(num_bytes);
+ DorisMetrics::instance()->load_rows->increment(num_rows);
+ DorisMetrics::instance()->load_bytes->increment(num_bytes);
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9719e6c..dab2046 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -307,9 +307,11 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) {
// this is a load plan, and load is not finished, just make a brief report
params.__set_loaded_rows(runtime_state->num_rows_load_total());
+ params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
} else {
if (runtime_state->query_options().query_type == TQueryType::LOAD) {
params.__set_loaded_rows(runtime_state->num_rows_load_total());
+ params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
}
if (profile == nullptr) {
params.__isset.profile = false;
@@ -373,11 +375,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
try {
coord->reportExecStatus(res, params);
} catch (TTransportException& e) {
- LOG(WARNING) << "Retrying ReportExecStatus. query id: "
- << print_id(_query_id) << ", instance id: "
- << print_id(_fragment_instance_id)
- << " to " << _coord_addr
- << ", err: " << e.what();
+ LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id)
+ << ", instance id: " << print_id(_fragment_instance_id) << " to "
+ << _coord_addr << ", err: " << e.what();
rpc_status = coord.reopen();
if (!rpc_status.ok()) {
@@ -452,9 +452,11 @@ FragmentMgr::~FragmentMgr() {
static void empty_function(PlanFragmentExecutor* exec) {}
void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) {
- TAG(LOG(INFO)).log("PlanFragmentExecutor::_exec_actual")
- .query_id(exec_state->query_id()).instance_id(exec_state->fragment_instance_id())
- .tag("pthread_id", std::to_string((uintptr_t) pthread_self()));
+ TAG(LOG(INFO))
+ .log("PlanFragmentExecutor::_exec_actual")
+ .query_id(exec_state->query_id())
+ .instance_id(exec_state->fragment_instance_id())
+ .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
exec_state->execute();
std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index b57a2de..96cf48d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -346,6 +346,16 @@ public class BrokerLoadJob extends BulkLoadJob {
}
}
+ @Override
+ public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ long scannedBytes, boolean isDone) {
+ super.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+ progress = (int) ((double) loadStatistic.getLoadBytes() / loadStatistic.totalFileSizeB * 100);
+ if (progress >= 100) {
+ progress = 99;
+ }
+ }
+
private String increaseCounter(String key, String deltaValue) {
long value = 0;
if (loadingStatus.getCounters().containsKey(key)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c5c97b3..1d86239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -136,6 +136,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
// load task id -> fragment id -> rows count
private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
+ // load task id -> fragment id -> load bytes
+ private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create();
+
// load task id -> unfinished backend id list
private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
// load task id -> all backend id list
@@ -151,6 +154,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
for (TUniqueId fragId : fragmentIds) {
counterTbl.put(loadId, fragId, 0L);
}
+ loadBytes.rowMap().remove(loadId);
+ for (TUniqueId fragId : fragmentIds) {
+ loadBytes.put(loadId, fragId, 0L);
+ }
allBackendIds.put(loadId, relatedBackendIds);
// need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
// allBackendIds, the list in unfinishedBackendIds will not be changed.
@@ -159,15 +166,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public synchronized void removeLoad(TUniqueId loadId) {
counterTbl.rowMap().remove(loadId);
+ loadBytes.rowMap().remove(loadId);
unfinishedBackendIds.remove(loadId);
allBackendIds.remove(loadId);
}
public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
- long rows, boolean isDone) {
+ long rows, long bytes, boolean isDone) {
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
+
+ if (loadBytes.contains(loadId, fragmentId)) {
+ loadBytes.put(loadId, fragmentId, bytes);
+ }
if (isDone && unfinishedBackendIds.containsKey(loadId)) {
unfinishedBackendIds.get(loadId).remove(backendId);
}
@@ -181,18 +193,30 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
return total;
}
+ public synchronized long getLoadBytes() {
+ long total = 0;
+ for (long bytes : loadBytes.values()) {
+ total += bytes;
+ }
+ return total;
+ }
+
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
+ long totalBytes = 0;
+ for (long bytes : loadBytes.values()) {
+ totalBytes += bytes;
+ }
Map<String, Object> details = Maps.newHashMap();
details.put("ScannedRows", total);
+ details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", counterTbl.rowMap().size());
- details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
details.put("All backends", getPrintableMap(allBackendIds));
Gson gson = new Gson();
@@ -284,8 +308,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
loadStatistic.initLoad(loadId, fragmentIds, relatedBackendIds);
}
- public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, boolean isDone) {
- loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, isDone);
+ public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ long scannedBytes, boolean isDone) {
+ loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
}
public void setLoadFileInfo(int fileNum, long fileSize) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0c71559..42aea02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -675,10 +675,10 @@ public class LoadManager implements Writable{
}
public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
- long scannedRows, boolean isDone) {
+ long scannedRows, long scannedBytes, boolean isDone) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
- job.updateProgress(beId, loadId, fragmentId, scannedRows, isDone);
+ job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index cb38cbf..9185437 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -331,7 +331,8 @@ public class SparkLoadJob extends BulkLoadJob {
TUniqueId dummyId = new TUniqueId(0, 0);
long dummyBackendId = -1L;
loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId));
- loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows, true);
+ loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows,
+ dppResult.scannedBytes, true);
Map<String, String> counters = loadingStatus.getCounters();
counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ad0a9d9..f1adb35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1538,8 +1538,8 @@ public class Coordinator {
if (params.isSetLoadedRows()) {
Catalog.getCurrentCatalog().getLoadManager().updateJobProgress(
- jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows,
- params.done);
+ jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
+ params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
}
}
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
index fa813ca..037cc9b 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
@@ -32,6 +32,7 @@ public class DppResult implements Serializable {
abnormalRows = 0;
unselectRows = 0;
partialAbnormalRows = "";
+ scannedBytes = 0;
}
@SerializedName("is_success")
@@ -61,4 +62,7 @@ public class DppResult implements Serializable {
// only part of abnormal rows will be returned
@SerializedName("partial_abnormal_rows")
public String partialAbnormalRows;
+
+ @SerializedName("scanned_bytes")
+ public long scannedBytes;
}
\ No newline at end of file
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 374e282..013ac0d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -398,6 +398,8 @@ struct TReportExecStatusParams {
15: optional i64 loaded_rows
16: optional i64 backend_id
+
+ 17: optional i64 loaded_bytes
}
struct TFeResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org