You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/06/04 13:00:53 UTC

[GitHub] [incubator-doris] wyb commented on a change in pull request #3742: [Spark load][Be 1/1] Be handle push task

wyb commented on a change in pull request #3742:
URL: https://github.com/apache/incubator-doris/pull/3742#discussion_r435205288



##########
File path: be/src/olap/push_handler.cpp
##########
@@ -248,6 +259,149 @@ void PushHandler::_get_tablet_infos(const vector<TabletVars>& tablet_vars,
   }
 }
 
+OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet,
+                                    TabletSharedPtr new_tablet,
+                                    RowsetSharedPtr* cur_rowset,
+                                    RowsetSharedPtr* new_rowset) {
+    OLAPStatus res = OLAP_SUCCESS;
+    PushBrokerReader* reader = nullptr;
+    Schema* schema = nullptr;
+    uint32_t num_rows = 0;
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+
+    do {
+        VLOG(3) << "start to convert delta file.";
+
+        // 1. init RowsetBuilder of cur_tablet for current push
+        VLOG(3) << "init rowset builder. tablet=" << cur_tablet->full_name()
+            << ", block_row_size=" << cur_tablet->num_rows_per_row_block();
+        RowsetWriterContext context;
+        context.rowset_id = StorageEngine::instance()->next_rowset_id();
+        context.tablet_uid = cur_tablet->tablet_uid();
+        context.tablet_id = cur_tablet->tablet_id();
+        context.partition_id = _request.partition_id;
+        context.tablet_schema_hash = cur_tablet->schema_hash();
+        context.rowset_type = StorageEngine::instance()->default_rowset_type();
+        context.rowset_path_prefix = cur_tablet->tablet_path();
+        context.tablet_schema = &(cur_tablet->tablet_schema());
+        context.rowset_state = PREPARED;
+        context.txn_id = _request.transaction_id;
+        context.load_id = load_id;
+        // although the spark load output files are fully sorted,
+        // but it depends on thirparty implementation, so we conservatively
+        // set this value to OVERLAP_UNKNOWN
+        context.segments_overlap = OVERLAP_UNKNOWN;
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+        if (OLAP_SUCCESS != res) {
+            LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
+                         << ", txn_id=" << _request.transaction_id
+                         << ", res=" << res;
+            break;
+        }
+
+        // 2. Init PushBrokerReader to read broker file if exist,
+        //    in case of empty push this will be skipped.
+        std::string path = _request.broker_scan_range.ranges[0].path;
+        LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path
+                  << ", file size=" << _request.broker_scan_range.ranges[0].file_size;
+        if (!path.empty()) {
+            reader = new(std::nothrow) PushBrokerReader();
+            if (reader == nullptr) {
+                LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name();
+                res = OLAP_ERR_MALLOC_ERROR;
+                break;
+            }
+
+            // init schema
+            schema = new(std::nothrow) Schema(cur_tablet->tablet_schema());
+            if (schema == nullptr) {
+                LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name();
+                res = OLAP_ERR_MALLOC_ERROR;
+                break;
+            }
+
+            // init Reader
+            if (OLAP_SUCCESS != (res = reader->init(schema, 
+                                                    _request.broker_scan_range,
+                                                    _request.desc_tbl))) {
+                LOG(WARNING) << "fail to init reader. res=" << res
+                             << ", tablet=" << cur_tablet->full_name();
+                res = OLAP_ERR_PUSH_INIT_ERROR;
+                break;
+            }
+
+            // 3. Init Row
+            uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size());
+            ContiguousRow row(schema, tuple_buf);
+
+            // 4. Read data from broker and write into SegmentGroup of cur_tablet
+            // Convert from raw to delta
+            VLOG(3) << "start to convert etl file to delta.";
+            while (!reader->eof()) {
+                res = reader->next(&row);
+                if (OLAP_SUCCESS != res) {
+                    LOG(WARNING) << "read next row failed."
+                        << " res=" << res << " read_rows=" << num_rows;
+                    break;
+                } else {
+                    if (reader->eof()) {
+                        break;
+                    }
+                    if (OLAP_SUCCESS != (res = rowset_writer->add_row(row))) {
+                        LOG(WARNING) << "fail to attach row to rowset_writer. "
+                            << "res=" << res
+                            << ", tablet=" << cur_tablet->full_name()
+                            << ", read_rows=" << num_rows;
+                        break;
+                    }
+                    num_rows++;
+                }
+            }
+
+            reader->print_profile();
+            reader->finalize();

Review comment:
       reader->close()

##########
File path: be/src/olap/push_handler.cpp
##########
@@ -368,7 +522,7 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet,
                 }
             }
 
-            reader->finalize();
+            reader->close();

Review comment:
       IBinaryReader should use finalize

##########
File path: be/src/olap/push_handler.cpp
##########
@@ -761,6 +915,126 @@ OLAPStatus LzoBinaryReader::_next_block() {
   return res;
 }
 
+OLAPStatus PushBrokerReader::init(const Schema* schema,
+                                  const TBrokerScanRange& t_scan_range,
+                                  const TDescriptorTable& t_desc_tbl) {
+    // init schema
+    _schema = schema;
+
+    // init runtime state, runtime profile, counter
+    TUniqueId dummy_id;
+    dummy_id.hi = 0;
+    dummy_id.lo = 0;
+    TPlanFragmentExecParams params;
+    params.fragment_instance_id = dummy_id;
+    params.query_id = dummy_id;
+    TExecPlanFragmentParams fragment_params;
+    fragment_params.params = params;
+    fragment_params.protocol_version = PaloInternalServiceVersion::V1;
+    TQueryOptions query_options;
+    TQueryGlobals query_globals;
+    _runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals,
+                                          ExecEnv::GetInstance()));
+    DescriptorTbl* desc_tbl = NULL;
+    Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl);
+    if (UNLIKELY(!status.ok())) {
+        LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg();
+        return OLAP_ERR_PUSH_INIT_ERROR;
+    }
+    _runtime_state->set_desc_tbl(desc_tbl);
+    status = _runtime_state->init_mem_trackers(dummy_id);
+    if (UNLIKELY(!status.ok())) {
+        LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
+        return OLAP_ERR_PUSH_INIT_ERROR;
+    }
+    _runtime_profile.reset(new RuntimeProfile(_runtime_state->obj_pool(), "PushBrokerReader"));
+    _mem_tracker.reset(new MemTracker(-1));
+    _mem_pool.reset(new MemPool(_mem_tracker.get()));
+    _counter.reset(new ScannerCounter());
+
+    // init scanner
+    BaseScanner *scanner = nullptr;
+    switch (t_scan_range.ranges[0].format_type) {
+    case TFileFormatType::FORMAT_PARQUET:
+        scanner = new ParquetScanner(_runtime_state.get(),
+                                  _runtime_profile.get(),
+                                  t_scan_range.params,
+                                  t_scan_range.ranges,
+                                  t_scan_range.broker_addresses,
+                                  _counter.get());
+        break;
+    default:
+        LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type;
+		return OLAP_ERR_PUSH_INIT_ERROR;
+	}
+    _scanner.reset(scanner); 
+    status = _scanner->open();
+    if (UNLIKELY(!status.ok())) {
+        LOG(WARNING) << "Failed to open scanner, msg: " << status.get_error_msg();
+        return OLAP_ERR_PUSH_INIT_ERROR;
+    }
+
+    // init tuple
+    auto tuple_id = t_scan_range.params.dest_tuple_id;
+    _tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id);
+    if (_tuple_desc == nullptr) {
+        std::stringstream ss;
+        LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id;
+        return OLAP_ERR_PUSH_INIT_ERROR;
+    }
+
+    int tuple_buffer_size = _tuple_desc->byte_size();
+    void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size);
+    if (tuple_buffer == nullptr) {
+        LOG(WARNING) << "Allocate memory for tuple failed";
+        return OLAP_ERR_PUSH_INIT_ERROR;
+    }
+    _tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+
+    _ready = true;
+	return OLAP_SUCCESS;
+}
+
+OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
+    if (!_ready || row == nullptr) {
+        return OLAP_ERR_INPUT_PARAMETER_ERROR;
+    }
+
+    memset(_tuple, 0, _tuple_desc->num_null_bytes());
+    // Get from scanner
+    Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof);
+    if (UNLIKELY(!status.ok())) {
+        LOG(WARNING) << "Scanner get next tuple failed";
+        return OLAP_ERR_PUSH_INPUT_DATA_ERROR;
+    }
+    if (_eof) {
+        return OLAP_SUCCESS;
+    }
+    //LOG(INFO) << "row data: " << _tuple->to_string(*_tuple_desc);
+
+    auto slot_descs = _tuple_desc->slots();
+    size_t num_key_columns = _schema->num_key_columns();
+    for (size_t i = 0; i < slot_descs.size(); ++i) {
+        auto cell = row->cell(i);
+        const SlotDescriptor* slot = slot_descs[i];
+        bool is_null = _tuple->is_null(slot->null_indicator_offset());
+        const void* value = _tuple->get_slot(slot->tuple_offset());
+        _schema->column(i)->consume(&cell, (const char*)value, is_null, 
+                                    _mem_pool.get(), _runtime_state->obj_pool());
+        if (i >= num_key_columns) {
+            _schema->column(i)->agg_finalize(&cell, _mem_pool.get());

Review comment:
       Later you can improve this by copying the buffer directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org