You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/19 06:15:15 UTC

[incubator-doris] branch master updated: [Improvement] spark load without agg and de/serialization (#6270)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c65ec31  [Improvement] spark load without agg and de/serialization  (#6270)
c65ec31 is described below

commit c65ec3136bdd158ab35ed9bdc951cfada6552e9b
Author: Xiang Wei <we...@gmail.com>
AuthorDate: Thu Aug 19 14:15:01 2021 +0800

    [Improvement] spark load without agg and de/serialization  (#6270)
    
    fix #6269
    
    The outline of our changes is to improve our memory in case of OOM in BE and to speed up the calculation.
    1. We do not need to do Aggregation in load, which has already been done in the ETL spark job.
    2. Based on 1, we do not need to serialize/deserialize bitmap/HLL objects.
---
 be/src/olap/push_handler.cpp | 82 ++++++++++++++++++++++++++++++++++++++------
 be/src/olap/push_handler.h   |  1 +
 2 files changed, 72 insertions(+), 11 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index b6a1ad3..0b9d3ac 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -150,7 +150,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP
     if (res != OLAP_SUCCESS) {
         LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
                      << ", failed to process realtime push."
-                     << ", table=" << tablet->full_name()
+                     << ", tablet=" << tablet->full_name()
                      << ", transaction_id=" << request.transaction_id;
         for (TabletVars& tablet_var : *tablet_vars) {
             if (tablet_var.tablet == nullptr) {
@@ -944,6 +944,70 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, const TBrokerScanRange&
     return OLAP_SUCCESS;
 }
 
+OLAPStatus PushBrokerReader::fill_field_row(RowCursorCell* dst, const char* src, bool src_null, MemPool* mem_pool, FieldType type){
+    switch (type) {
+        case OLAP_FIELD_TYPE_DECIMAL: {
+            dst->set_is_null(src_null);
+            if (src_null) {
+                break;
+            }
+            auto *decimal_value = reinterpret_cast<const DecimalV2Value *>(src);
+            auto *storage_decimal_value = reinterpret_cast<decimal12_t *>(dst->mutable_cell_ptr());
+            storage_decimal_value->integer = decimal_value->int_value();
+            storage_decimal_value->fraction = decimal_value->frac_value();
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIME: {
+            dst->set_is_null(src_null);
+            if (src_null) {
+                break;
+            }
+
+            auto* datetime_value = reinterpret_cast<const DateTimeValue*>(src);
+            auto* storage_datetime_value = reinterpret_cast<uint64_t*>(dst->mutable_cell_ptr());
+            *storage_datetime_value = datetime_value->to_olap_datetime();
+            break;
+        }
+
+        case OLAP_FIELD_TYPE_DATE: {
+            dst->set_is_null(src_null);
+            if (src_null) {
+                break;
+            }
+
+            auto* date_value = reinterpret_cast<const DateTimeValue*>(src);
+            auto* storage_date_value = reinterpret_cast<uint24_t*>(dst->mutable_cell_ptr());
+            *storage_date_value = static_cast<int64_t>(date_value->to_olap_date());
+            break;
+        }
+        case OLAP_FIELD_TYPE_BOOL:
+        case OLAP_FIELD_TYPE_TINYINT:
+        case OLAP_FIELD_TYPE_SMALLINT:
+        case OLAP_FIELD_TYPE_INT:
+        case OLAP_FIELD_TYPE_UNSIGNED_INT:
+        case OLAP_FIELD_TYPE_BIGINT:
+        case OLAP_FIELD_TYPE_LARGEINT:
+        case OLAP_FIELD_TYPE_FLOAT:
+        case OLAP_FIELD_TYPE_DOUBLE:
+        case OLAP_FIELD_TYPE_CHAR:
+        case OLAP_FIELD_TYPE_VARCHAR:
+        case OLAP_FIELD_TYPE_HLL:
+        case OLAP_FIELD_TYPE_OBJECT:{
+            dst->set_is_null(src_null);
+            if (src_null) {
+                break;
+            }
+            const TypeInfo* type_info = get_type_info(type);
+            type_info->deep_copy(dst->mutable_cell_ptr(), src, mem_pool);
+            break;
+        }
+        default:
+            return OLAP_ERR_INVALID_SCHEMA;
+    }
+
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
     if (!_ready || row == nullptr) {
         return OLAP_ERR_INPUT_PARAMETER_ERROR;
@@ -961,22 +1025,18 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
     }
 
     auto slot_descs = _tuple_desc->slots();
-    size_t num_key_columns = _schema->num_key_columns();
-
     // finalize row
     for (size_t i = 0; i < slot_descs.size(); ++i) {
         auto cell = row->cell(i);
         const SlotDescriptor* slot = slot_descs[i];
         bool is_null = _tuple->is_null(slot->null_indicator_offset());
         const void* value = _tuple->get_slot(slot->tuple_offset());
-        // try execute init method defined in aggregateInfo
-        // by default it only copies data into cell
-        _schema->column(i)->consume(&cell, (const char*)value, is_null, _mem_pool.get(),
-                                    _runtime_state->obj_pool());
-        // if column(i) is a value column, try execute finalize method defined in aggregateInfo
-        // to convert data into final format
-        if (i >= num_key_columns) {
-            _schema->column(i)->agg_finalize(&cell, _mem_pool.get());
+
+        FieldType type = _schema->column(i)->type();
+        OLAPStatus field_status = fill_field_row(&cell, (const char*)value, is_null, _mem_pool.get(), type);
+        if (field_status!= OLAP_SUCCESS) {
+            LOG(WARNING) << "fill field row failed in spark load, slot index: " << i << ", type: " << type;
+            return OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID;
         }
     }
 
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 7f8d0aa..e1fb787 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -200,6 +200,7 @@ public:
     MemPool* mem_pool() { return _mem_pool.get(); }
 
 private:
+    OLAPStatus fill_field_row(RowCursorCell* dst, const char* src,bool src_null, MemPool* mem_pool, FieldType type);
     bool _ready;
     bool _eof;
     TupleDescriptor* _tuple_desc;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org