You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/11/29 13:05:24 UTC
[incubator-doris] branch master updated: Check the return status of
`_flush_memtable_async()` (#2332)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c5f7f7e Check the return status of `_flush_memtable_async()` (#2332)
c5f7f7e is described below
commit c5f7f7e0f4d5baa841054880155d9b0632c92411
Author: LingBin <li...@gmail.com>
AuthorDate: Fri Nov 29 07:05:17 2019 -0600
Check the return status of `_flush_memtable_async()` (#2332)
This commit also contains some adjustments of the forward declaration
---
be/src/olap/delta_writer.cpp | 32 ++++++++++++++++----------------
be/src/olap/delta_writer.h | 10 +++++-----
be/src/olap/memtable.cpp | 8 ++++++--
be/src/olap/memtable.h | 10 ++++------
be/src/runtime/tablets_channel.h | 9 +--------
5 files changed, 32 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 8df038b..85b30b8 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -21,9 +21,8 @@
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/rowset/rowset_factory.h"
-#include "olap/rowset/rowset_meta_manager.h"
-#include "olap/rowset/rowset_id_generator.h"
#include "olap/schema.h"
+#include "olap/schema_change.h"
#include "olap/storage_engine.h"
namespace doris {
@@ -160,8 +159,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_flush_memtable_async());
// create a new memtable for new incoming data
- _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
- _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get()));
+ _reset_mem_table();
}
return OLAP_SUCCESS;
}
@@ -175,9 +173,8 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
// equal means there is no memtable in flush queue, just flush this memtable
VLOG(3) << "flush memtable to reduce mem consumption. memtable size: " << _mem_table->memory_usage()
<< ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
- _flush_memtable_async();
- _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
- _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get()));
+ RETURN_NOT_OK(_flush_memtable_async());
+ _reset_mem_table();
} else {
DCHECK(mem_consumption() > _mem_table->memory_usage());
// this means there should be at least one memtable in flush queue.
@@ -187,6 +184,12 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
return OLAP_SUCCESS;
}
+void DeltaWriter::_reset_mem_table() {
+ _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
+ _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
+ _mem_tracker.get()));
+}
+
OLAPStatus DeltaWriter::close() {
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
@@ -212,12 +215,11 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
LOG(WARNING) << "fail to build rowset";
return OLAP_ERR_MALLOC_ERROR;
}
- OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
- _req.load_id, _cur_rowset, false);
+ OLAPStatus res = _storage_engine->txn_manager()->commit_txn(
+ _req.partition_id, _tablet, _req.txn_id, _req.load_id, _cur_rowset, false);
if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
- LOG(WARNING) << "commit txn: " << _req.txn_id
- << " for rowset: " << _cur_rowset->rowset_id()
- << " failed.";
+ LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
+ << " for rowset: " << _cur_rowset->rowset_id();
return res;
}
@@ -236,8 +238,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
_req.load_id, _new_rowset, false);
if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
- LOG(WARNING) << "save pending rowset failed. rowset_id:"
- << _new_rowset->rowset_id();
+ LOG(WARNING) << "Failed to save pending rowset. rowset_id:" << _new_rowset->rowset_id();
return res;
}
}
@@ -256,8 +257,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
_delta_written_success = true;
const FlushStatistic& stat = _flush_handler->get_stats();
- LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
- << ", stats: " << stat;
+ LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 46726a0..d7cc427 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -19,12 +19,8 @@
#define DORIS_BE_SRC_DELTA_WRITER_H
#include "olap/tablet.h"
-#include "olap/schema_change.h"
-#include "runtime/descriptors.h"
-#include "runtime/tuple.h"
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
-#include "util/blocking_queue.hpp"
namespace doris {
@@ -32,8 +28,10 @@ class FlushHandler;
class MemTable;
class MemTracker;
class Schema;
-class SegmentGroup;
class StorageEngine;
+class Tuple;
+class TupleDescriptor;
+class SlotDescriptor;
enum WriteType {
LOAD = 1,
@@ -87,6 +85,8 @@ private:
void _garbage_collection();
+ void _reset_mem_table();
+
private:
bool _is_init = false;
WriteRequest _req;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 7edb6f2..30f5675 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -17,9 +17,13 @@
#include "olap/memtable.h"
+#include "common/object_pool.h"
#include "olap/rowset/column_data_writer.h"
+#include "olap/rowset/rowset_writer.h"
#include "olap/row_cursor.h"
#include "olap/row.h"
+#include "olap/schema.h"
+#include "runtime/tuple.h"
#include "util/runtime_profile.h"
#include "util/debug_util.h"
@@ -63,7 +67,7 @@ size_t MemTable::memory_usage() {
void MemTable::insert(Tuple* tuple) {
ContiguousRow row(_schema, _tuple_buf);
-
+
for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row.cell(i);
const SlotDescriptor* slot = (*_slot_descs)[i];
@@ -93,7 +97,7 @@ OLAPStatus MemTable::flush() {
}
RETURN_NOT_OK(_rowset_writer->flush());
}
- DorisMetrics::memtable_flush_total.increment(1);
+ DorisMetrics::memtable_flush_total.increment(1);
DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000);
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 1688dbd..b7e6933 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -18,16 +18,14 @@
#ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
#define DORIS_BE_SRC_OLAP_MEMTABLE_H
-#include "common/object_pool.h"
-#include "olap/schema.h"
#include "olap/skiplist.h"
-#include "runtime/tuple.h"
-#include "olap/rowset/rowset_writer.h"
namespace doris {
-class RowCursor;
class RowsetWriter;
+class ObjectPool;
+class Schema;
+class Tuple;
class MemTable {
public:
@@ -42,7 +40,7 @@ public:
OLAPStatus close();
private:
- int64_t _tablet_id;
+ int64_t _tablet_id;
Schema* _schema;
const TabletSchema* _tablet_schema;
TupleDescriptor* _tuple_desc;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index cb63e36..348caa8 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -48,13 +48,6 @@ struct TabletsChannelKey {
std::string to_string() const;
};
-struct TabletsChannelKeyHasher {
- std::size_t operator()(const TabletsChannelKey& key) const {
- size_t seed = key.id.hash();
- return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed);
- }
-};
-
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
class DeltaWriter;
@@ -83,7 +76,7 @@ public:
Status reduce_mem_usage();
int64_t mem_consumption() const { return _mem_tracker->consumption(); }
-
+
private:
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& params);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org