You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/11 06:03:47 UTC

[incubator-doris] 06/09: [fix] check disk capacity before writing data (#8887)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 9c8d005abc2616e703f263fd6d013c647e42f58a
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 8 11:29:49 2022 +0800

    [fix] check disk capacity before writing data (#8887)
    
    1. We forgot to check disk capacity when writing data.
    2. TODO: the user specified disk capacity is not used now. We need to find a way to use it.
    3. Avoid print too much compaction log when there is not suitable version for compaction.
---
 be/src/olap/compaction.cpp                       |  1 +
 be/src/olap/data_dir.cpp                         |  1 -
 be/src/olap/delta_writer.cpp                     |  1 +
 be/src/olap/push_handler.cpp                     |  2 ++
 be/src/olap/rowset/beta_rowset_writer.cpp        |  2 +-
 be/src/olap/rowset/rowset_writer_context.h       |  5 +++++
 be/src/olap/rowset/segment_v2/segment_writer.cpp | 11 +++++++++--
 be/src/olap/rowset/segment_v2/segment_writer.h   |  4 +++-
 be/src/olap/schema_change.cpp                    |  3 +++
 be/src/olap/tablet.cpp                           | 16 ++++++++++++----
 be/src/olap/tablet_manager.cpp                   |  1 +
 11 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index edb7559752..f0dc268570 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -154,6 +154,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() {
     context.tablet_id = _tablet->tablet_id();
     context.partition_id = _tablet->partition_id();
     context.tablet_schema_hash = _tablet->schema_hash();
+    context.data_dir = _tablet->data_dir();
     context.rowset_type = StorageEngine::instance()->default_rowset_type();
     if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
         context.rowset_type = BETA_ROWSET;
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 33f472906d..96aec01909 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -755,7 +755,6 @@ bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
     double used_pct = (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
                       (double)_disk_capacity_bytes;
     int64_t left_bytes = _available_bytes - incoming_data_size;
-
     if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
         left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
         LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c0b17b2105..9738f8f55b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -135,6 +135,7 @@ OLAPStatus DeltaWriter::init() {
     writer_context.load_id = _req.load_id;
     writer_context.segments_overlap = OVERLAPPING;
     writer_context.parent_mem_tracker = _mem_tracker;
+    writer_context.data_dir = _tablet->data_dir();
     RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer));
 
     _tablet_schema = &(_tablet->tablet_schema());
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index dd57cc2dc4..bb3149216b 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -226,6 +226,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
         context.tablet_id = cur_tablet->tablet_id();
         context.partition_id = _request.partition_id;
         context.tablet_schema_hash = cur_tablet->schema_hash();
+        context.data_dir = cur_tablet->data_dir();
         context.rowset_type = StorageEngine::instance()->default_rowset_type();
         if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
             context.rowset_type = BETA_ROWSET;
@@ -412,6 +413,7 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new
         context.tablet_id = cur_tablet->tablet_id();
         context.partition_id = _request.partition_id;
         context.tablet_schema_hash = cur_tablet->schema_hash();
+        context.data_dir = cur_tablet->data_dir();
         context.rowset_type = StorageEngine::instance()->default_rowset_type();
         if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
             context.rowset_type = BETA_ROWSET;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 4b68b39059..3cfeb27091 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -225,7 +225,7 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::
     DCHECK(wblock != nullptr);
     segment_v2::SegmentWriterOptions writer_options;
     writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
-            _context.tablet_schema, writer_options, _context.parent_mem_tracker));
+            _context.tablet_schema, _context.data_dir, writer_options, _context.parent_mem_tracker));
     {
         std::lock_guard<SpinLock> l(_lock);
         _wblocks.push_back(std::move(wblock));
diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h
index 8c314f5dba..51dab55a34 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -67,6 +67,11 @@ struct RowsetWriterContext {
     // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int.
     // test cases can change this value to control flush timing
     uint32_t max_rows_per_segment = INT32_MAX;
+    // not owned, point to the data dir of this rowset
+    // for checking disk capacity when write data to disk.
+    // ATTN: not support for RowsetConvertor.
+    // (because it hard to refactor, and RowsetConvertor will be deprecated in future)
+    DataDir* data_dir = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index adbfef9694..669c770655 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -19,6 +19,7 @@
 
 #include "common/logging.h" // LOG
 #include "env/env.h"        // Env
+#include "olap/data_dir.h"
 #include "olap/fs/block_manager.h"
 #include "olap/row.h"                             // ContiguousRow
 #include "olap/row_cursor.h"                      // RowCursor
@@ -37,8 +38,10 @@ const char* k_segment_magic = "D0R1";
 const uint32_t k_segment_magic_length = 4;
 
 SegmentWriter::SegmentWriter(fs::WritableBlock* wblock, uint32_t segment_id,
-                             const TabletSchema* tablet_schema, const SegmentWriterOptions& opts, std::shared_ptr<MemTracker> parent)
-        : _segment_id(segment_id), _tablet_schema(tablet_schema), _opts(opts), _wblock(wblock), _mem_tracker(MemTracker::CreateTracker(
+                             const TabletSchema* tablet_schema, DataDir* data_dir,
+                             const SegmentWriterOptions& opts, std::shared_ptr<MemTracker> parent)
+        : _segment_id(segment_id), _tablet_schema(tablet_schema), _data_dir(data_dir),
+          _opts(opts), _wblock(wblock), _mem_tracker(MemTracker::CreateTracker(
                 -1, "Segment-" + std::to_string(segment_id), parent, false)) {
     CHECK_NOTNULL(_wblock);
 }
@@ -134,6 +137,10 @@ uint64_t SegmentWriter::estimate_segment_size() {
 }
 
 Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
+    // check disk capacity
+    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t) estimate_segment_size())) {
+        return Status::InternalError(fmt::format("disk {} exceed capacity limit.", _data_dir->path_hash()));
+    }
     for (auto& column_writer : _column_writers) {
         RETURN_IF_ERROR(column_writer->finish());
     }
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index d0600996ad..ebba30fe70 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -28,6 +28,7 @@
 
 namespace doris {
 
+class DataDir;
 class MemTracker;
 class RowBlock;
 class RowCursor;
@@ -53,7 +54,7 @@ struct SegmentWriterOptions {
 class SegmentWriter {
 public:
     explicit SegmentWriter(fs::WritableBlock* block, uint32_t segment_id,
-                           const TabletSchema* tablet_schema, const SegmentWriterOptions& opts, std::shared_ptr<MemTracker> parent = nullptr);
+                           const TabletSchema* tablet_schema, DataDir* data_dir, const SegmentWriterOptions& opts, std::shared_ptr<MemTracker> parent = nullptr);
     ~SegmentWriter();
 
     Status init(uint32_t write_mbytes_per_sec);
@@ -83,6 +84,7 @@ private:
 private:
     uint32_t _segment_id;
     const TabletSchema* _tablet_schema;
+    DataDir* _data_dir;
     SegmentWriterOptions _opts;
 
     // Not owned. owned by RowsetWriter
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index eb500a9bc8..0cd9ae7567 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1338,6 +1338,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
     context.rowset_type = new_rowset_type;
     context.path_desc = new_tablet->tablet_path_desc();
     context.tablet_schema = &(new_tablet->tablet_schema());
+    context.data_dir = new_tablet->data_dir();
     context.rowset_state = VISIBLE;
     context.version = version;
     context.segments_overlap = segments_overlap;
@@ -1734,6 +1735,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
     writer_context.tablet_id = new_tablet->tablet_id();
     writer_context.partition_id = (*base_rowset)->partition_id();
     writer_context.tablet_schema_hash = new_tablet->schema_hash();
+    writer_context.data_dir = new_tablet->data_dir();
     writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
     if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
         writer_context.rowset_type = BETA_ROWSET;
@@ -1878,6 +1880,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         writer_context.tablet_id = new_tablet->tablet_id();
         writer_context.partition_id = new_tablet->partition_id();
         writer_context.tablet_schema_hash = new_tablet->schema_hash();
+        writer_context.data_dir = new_tablet->data_dir();
         // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
         writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
         if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6516b81617..90b70672d1 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1333,11 +1333,15 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio
         OLAPStatus res = _cumulative_compaction->prepare_compact();
         if (res != OLAP_SUCCESS) {
             set_last_cumu_compaction_failure_time(UnixMillis());
+            *permits = 0;
             if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION) {
                 DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
+                return Status::InternalError(fmt::format("prepare cumulative compaction with err: {}", res));
             }
-            *permits = 0;
-            return Status::InternalError(fmt::format("prepare compaction with err: {}", res));
+            // return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that we don't need to
+            // print too much useless logs.
+            // And because we set permits to 0, so even if we return OK here, nothing will be done.
+            return Status::OK();
         }
         compaction_rowsets = _cumulative_compaction->get_input_rowsets();
     } else {
@@ -1358,11 +1362,15 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio
         OLAPStatus res = _base_compaction->prepare_compact();
         if (res != OLAP_SUCCESS) {
             set_last_base_compaction_failure_time(UnixMillis());
+            *permits = 0;
             if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
                 DorisMetrics::instance()->base_compaction_request_failed->increment(1);
+                return Status::InternalError(fmt::format("prepare base compaction with err: {}", res));
             }
-            *permits = 0;
-            return Status::InternalError(fmt::format("prepare compaction with err: {}", res));
+            // return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't need to
+            // print too much useless logs.
+            // And because we set permits to 0, so even if we return OK here, nothing will be done.
+            return Status::OK();
         }
         compaction_rowsets = _base_compaction->get_input_rowsets();
     }
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index e997d74101..31c74c650f 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1195,6 +1195,7 @@ OLAPStatus TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq
             context.tablet_id = tablet->tablet_id();
             context.partition_id = tablet->partition_id();
             context.tablet_schema_hash = tablet->schema_hash();
+            context.data_dir = tablet->data_dir();
             if (!request.__isset.storage_format ||
                 request.storage_format == TStorageFormat::DEFAULT) {
                 context.rowset_type = StorageEngine::instance()->default_rowset_type();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org