You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/11/29 13:05:24 UTC

[incubator-doris] branch master updated: Check the return status of `_flush_memtable_async()` (#2332)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c5f7f7e  Check the return status of `_flush_memtable_async()` (#2332)
c5f7f7e is described below

commit c5f7f7e0f4d5baa841054880155d9b0632c92411
Author: LingBin <li...@gmail.com>
AuthorDate: Fri Nov 29 07:05:17 2019 -0600

    Check the return status of `_flush_memtable_async()` (#2332)
    
    This commit also contains some adjustments of the forward declaration
---
 be/src/olap/delta_writer.cpp     | 32 ++++++++++++++++----------------
 be/src/olap/delta_writer.h       | 10 +++++-----
 be/src/olap/memtable.cpp         |  8 ++++++--
 be/src/olap/memtable.h           | 10 ++++------
 be/src/runtime/tablets_channel.h |  9 +--------
 5 files changed, 32 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 8df038b..85b30b8 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -21,9 +21,8 @@
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/rowset/rowset_factory.h"
-#include "olap/rowset/rowset_meta_manager.h"
-#include "olap/rowset/rowset_id_generator.h"
 #include "olap/schema.h"
+#include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 
 namespace doris {
@@ -160,8 +159,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
     if (_mem_table->memory_usage() >= config::write_buffer_size) {
         RETURN_NOT_OK(_flush_memtable_async());
         // create a new memtable for new incoming data
-        _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
-                    _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get()));
+        _reset_mem_table();
     }
     return OLAP_SUCCESS;
 }
@@ -175,9 +173,8 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
         // equal means there is no memtable in flush queue, just flush this memtable
         VLOG(3) << "flush memtable to reduce mem consumption. memtable size: " << _mem_table->memory_usage()
                 << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
-        _flush_memtable_async();
-        _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
-                    _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get()));
+        RETURN_NOT_OK(_flush_memtable_async());
+        _reset_mem_table();
     } else {
         DCHECK(mem_consumption() > _mem_table->memory_usage());
         // this means there should be at least one memtable in flush queue.
@@ -187,6 +184,12 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
     return OLAP_SUCCESS;
 }
 
+void DeltaWriter::_reset_mem_table() {
+    _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
+                                  _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
+                                  _mem_tracker.get()));
+}
+
 OLAPStatus DeltaWriter::close() {
     if (!_is_init) {
         // if this delta writer is not initialized, but close() is called.
@@ -212,12 +215,11 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
         LOG(WARNING) << "fail to build rowset";
         return OLAP_ERR_MALLOC_ERROR;
     }
-    OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
-        _req.load_id, _cur_rowset, false);
+    OLAPStatus res = _storage_engine->txn_manager()->commit_txn(
+            _req.partition_id, _tablet, _req.txn_id, _req.load_id, _cur_rowset, false);
     if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
-        LOG(WARNING) << "commit txn: " << _req.txn_id
-                     << " for rowset: " << _cur_rowset->rowset_id()
-                     << " failed.";
+        LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
+                     << " for rowset: " << _cur_rowset->rowset_id();
         return res;
     }
 
@@ -236,8 +238,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
             _req.load_id, _new_rowset, false);
 
         if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
-            LOG(WARNING) << "save pending rowset failed. rowset_id:"
-                         << _new_rowset->rowset_id();
+            LOG(WARNING) << "Failed to save pending rowset. rowset_id:" << _new_rowset->rowset_id();
             return res;
         }
     }
@@ -256,8 +257,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_handler->get_stats();
-    LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
-        << ", stats: " << stat;
+    LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 46726a0..d7cc427 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -19,12 +19,8 @@
 #define DORIS_BE_SRC_DELTA_WRITER_H
 
 #include "olap/tablet.h"
-#include "olap/schema_change.h"
-#include "runtime/descriptors.h"
-#include "runtime/tuple.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "olap/rowset/rowset_writer.h"
-#include "util/blocking_queue.hpp"
 
 namespace doris {
 
@@ -32,8 +28,10 @@ class FlushHandler;
 class MemTable;
 class MemTracker;
 class Schema;
-class SegmentGroup;
 class StorageEngine;
+class Tuple;
+class TupleDescriptor;
+class SlotDescriptor;
 
 enum WriteType {
     LOAD = 1,
@@ -87,6 +85,8 @@ private:
 
     void _garbage_collection();
 
+    void _reset_mem_table();
+
 private:
     bool _is_init = false;
     WriteRequest _req;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 7edb6f2..30f5675 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -17,9 +17,13 @@
 
 #include "olap/memtable.h"
 
+#include "common/object_pool.h"
 #include "olap/rowset/column_data_writer.h"
+#include "olap/rowset/rowset_writer.h"
 #include "olap/row_cursor.h"
 #include "olap/row.h"
+#include "olap/schema.h"
+#include "runtime/tuple.h"
 #include "util/runtime_profile.h"
 #include "util/debug_util.h"
 
@@ -63,7 +67,7 @@ size_t MemTable::memory_usage() {
 
 void MemTable::insert(Tuple* tuple) {
     ContiguousRow row(_schema, _tuple_buf);
-    
+
     for (size_t i = 0; i < _slot_descs->size(); ++i) {
         auto cell = row.cell(i);
         const SlotDescriptor* slot = (*_slot_descs)[i];
@@ -93,7 +97,7 @@ OLAPStatus MemTable::flush() {
         }
         RETURN_NOT_OK(_rowset_writer->flush());
     }
-    DorisMetrics::memtable_flush_total.increment(1); 
+    DorisMetrics::memtable_flush_total.increment(1);
     DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000);
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 1688dbd..b7e6933 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -18,16 +18,14 @@
 #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
 #define DORIS_BE_SRC_OLAP_MEMTABLE_H
 
-#include "common/object_pool.h"
-#include "olap/schema.h"
 #include "olap/skiplist.h"
-#include "runtime/tuple.h"
-#include "olap/rowset/rowset_writer.h"
 
 namespace doris {
 
-class RowCursor;
 class RowsetWriter;
+class ObjectPool;
+class Schema;
+class Tuple;
 
 class MemTable {
 public:
@@ -42,7 +40,7 @@ public:
     OLAPStatus close();
 
 private:
-    int64_t _tablet_id; 
+    int64_t _tablet_id;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
     TupleDescriptor* _tuple_desc;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index cb63e36..348caa8 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -48,13 +48,6 @@ struct TabletsChannelKey {
     std::string to_string() const;
 };
 
-struct TabletsChannelKeyHasher {
-    std::size_t operator()(const TabletsChannelKey& key) const {
-        size_t seed = key.id.hash();
-        return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed);
-    }
-};
-
 std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
 
 class DeltaWriter;
@@ -83,7 +76,7 @@ public:
     Status reduce_mem_usage();
 
     int64_t mem_consumption() const { return _mem_tracker->consumption(); }
-    
+
 private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& params);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org