You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/06/01 02:25:13 UTC

[doris] branch master updated: [refactor](dynamic table) Make segment_writer unaware of dynamic schema, and ensure parsing is exception-safe. (#19594)

This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e21318834 [refactor](dynamic table) Make segment_writer unaware of dynamic schema, and ensure parsing is exception-safe. (#19594)
9e21318834 is described below

commit 9e21318834d5fd886def3cf4a81641f5d55ff8fd
Author: lihangyu <15...@163.com>
AuthorDate: Thu Jun 1 10:25:04 2023 +0800

    [refactor](dynamic table) Make segment_writer unaware of dynamic schema, and ensure parsing is exception-safe. (#19594)
    
    1. make ColumnObject exception safe
    2. introduce FlushContext and construct schema at memtable flush stage to make segment independent from dynamic schema
    3. add more test cases
---
 be/src/olap/memtable.cpp                           |   80 +-
 be/src/olap/memtable.h                             |    4 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |   23 +-
 be/src/olap/rowset/beta_rowset_writer.h            |   12 +-
 be/src/olap/rowset/rowset_writer.h                 |   15 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   72 +-
 be/src/olap/rowset/segment_v2/segment_writer.h     |   12 +-
 be/src/vec/columns/column_object.cpp               |  125 +-
 be/src/vec/columns/column_object.h                 |   26 +-
 be/src/vec/common/schema_util.cpp                  |   18 +-
 be/src/vec/common/schema_util.h                    |    2 +-
 be/src/vec/data_types/convert_field_to_type.cpp    |   50 +-
 be/src/vec/data_types/convert_field_to_type.h      |    4 +-
 be/src/vec/data_types/get_least_supertype.cpp      |  210 ++-
 be/src/vec/data_types/get_least_supertype.h        |    8 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |   35 +-
 be/src/vec/json/json_parser.cpp                    |    5 +-
 be/src/vec/json/parse2column.cpp                   |   40 +-
 be/src/vec/json/parse2column.h                     |   10 +-
 be/src/vec/olap/olap_data_convertor.cpp            |    8 +-
 lowercase.json                                     |    5 +
 .../data/dynamic_table_p0/floating_point.json      |    4 +
 .../data/dynamic_table_p0/floating_point2.json     |    4 +
 .../data/dynamic_table_p0/floating_point3.json     |    4 +
 .../data/dynamic_table_p0/invalid_dimension.json   |    4 +
 .../data/dynamic_table_p0/invalid_format.json      |    3 +
 regression-test/data/dynamic_table_p0/load.out     | 1939 ++++++++++++++++++++
 .../data/dynamic_table_p0/lowercase.json           |    4 +
 regression-test/data/dynamic_table_p0/sql/q05.out  |   14 +-
 .../data/dynamic_table_p0/uppercase.json           |    4 +
 .../suites/dynamic_table_p0/load.groovy            |   77 +
 31 files changed, 2481 insertions(+), 340 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 6bc82b052c..c398a22b15 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -45,15 +45,17 @@
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
+#include "util/string_util.h"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_object.h"
 #include "vec/columns/column_string.h"
 #include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type.h"
-#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/json/path_in_data.h"
 #include "vec/jsonb/serialize.h"
 
@@ -475,12 +477,14 @@ Status MemTable::_do_flush() {
         _aggregate<true>();
     }
     vectorized::Block block = _output_mutable_block.to_block();
+    FlushContext ctx;
+    ctx.block = &block;
     if (_tablet_schema->is_dynamic_schema()) {
         // Unfold variant column
-        unfold_variant_column(block);
+        RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
     }
     SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
-    RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size));
+    RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx));
     return Status::OK();
 }
 
@@ -488,27 +492,77 @@ Status MemTable::close() {
     return flush();
 }
 
-void MemTable::unfold_variant_column(vectorized::Block& block) {
+Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* ctx) {
     if (block.rows() == 0) {
-        return;
+        return Status::OK();
     }
+
+    // Sanitize block to match exactly from the same type of frontend meta
+    vectorized::schema_util::FullBaseSchemaView schema_view;
+    schema_view.table_id = _tablet_schema->table_id();
     vectorized::ColumnWithTypeAndName* variant_column =
             block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
     if (!variant_column) {
-        return;
+        return Status::OK();
     }
-    // remove it
+    auto base_column = variant_column->column;
     vectorized::ColumnObject& object_column =
-            assert_cast<vectorized::ColumnObject&>(variant_column->column->assume_mutable_ref());
-    // extend
+            assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
+    if (object_column.empty()) {
+        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+        return Status::OK();
+    }
+    object_column.finalize();
+    // Has extended columns
+    RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+    // Dynamic Block consists of two parts, dynamic part of columns and static part of columns
+    //  static   dynamic
+    // | ----- | ------- |
+    // The static ones are original _tablet_schame columns
+    TabletSchemaSPtr flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+    vectorized::Block flush_block(std::move(block));
+    // The dynamic ones are auto generated and extended, append them the the orig_block
     for (auto& entry : object_column.get_subcolumns()) {
-        if (entry->path.get_path() == vectorized::ColumnObject::COLUMN_NAME_DUMMY) {
+        const std::string& column_name = entry->path.get_path();
+        auto column_iter = schema_view.column_name_to_column.find(column_name);
+        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
+            // Column maybe dropped by light weight schema change DDL
             continue;
         }
-        block.insert({entry->data.get_finalized_column().get_ptr(),
-                      entry->data.get_least_common_type(), entry->path.get_path()});
+        TabletColumn column(column_iter->second);
+        auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
+                column, column.is_nullable());
+        // Dynamic generated columns does not appear in original tablet schema
+        if (_tablet_schema->field_index(column.name()) < 0) {
+            flush_schema->append_column(column);
+            flush_block.insert({data_type->create_column(), data_type, column.name()});
+        }
+    }
+
+    // Ensure column are all present at this schema version.Otherwise there will be some senario:
+    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added columns and schema version became 10
+    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns and fetched the schema at version 10
+    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta with [a, b, c, d, e]
+    // So we should make sure that rowset at the same schema version alawys contain the same size of columns.
+    // so that all columns at schema_version is in either _tablet_schema or schema_change_recorder
+    for (const auto& [name, column] : schema_view.column_name_to_column) {
+        if (_tablet_schema->field_index(name) == -1) {
+            const auto& tcolumn = schema_view.column_name_to_column[name];
+            TabletColumn new_column(tcolumn);
+            _rowset_writer->mutable_schema_change_recorder()->add_extended_columns(
+                    column, schema_view.schema_version);
+        }
     }
-    block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    // Last schema alignment before flush to disk, due to the schema maybe variant before this procedure
+    // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then columnA could be type of `Double`,
+    // unfold will cast to Double type
+    RETURN_IF_ERROR(vectorized::schema_util::unfold_object(
+            flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME), flush_block, true));
+    flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+    ctx->flush_schema = flush_schema;
+    block.swap(flush_block);
+    return Status::OK();
 }
 
 void MemTable::serialize_block_to_row_column(vectorized::Block& block) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index b99127bbf6..95d41df62d 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -44,6 +44,7 @@ class SlotDescriptor;
 class TabletSchema;
 class TupleDescriptor;
 enum KeysType : int;
+struct FlushContext;
 
 // row pos in _input_mutable_block
 struct RowInBlock {
@@ -163,7 +164,8 @@ private:
     // Eg. [A | B | C | (D, E, F)]
     // After unfold block structure changed to -> [A | B | C | D | E | F]
     // The expanded D, E, F is dynamic part of the block
-    void unfold_variant_column(vectorized::Block& block);
+    // The flushed Block columns should match exactly from the same type of frontend meta
+    Status unfold_variant_column(vectorized::Block& block, FlushContext* ctx);
 
 private:
     TabletSharedPtr _tablet;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 76a2a2d4a6..5358732daa 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -41,6 +41,7 @@
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/segment.h"
@@ -132,7 +133,9 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
         return Status::OK();
     }
     if (UNLIKELY(_segment_writer == nullptr)) {
-        RETURN_IF_ERROR(_create_segment_writer(&_segment_writer, block));
+        FlushContext ctx;
+        ctx.block = block;
+        RETURN_IF_ERROR(_create_segment_writer(&_segment_writer, &ctx));
     }
     return _add_block(block, &_segment_writer);
 }
@@ -406,7 +409,8 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
 }
 
 Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
-                                    std::unique_ptr<segment_v2::SegmentWriter>* segment_writer) {
+                                    std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
+                                    const FlushContext* flush_ctx) {
     size_t block_size_in_bytes = block->bytes();
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
@@ -417,7 +421,7 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
         if (UNLIKELY(max_row_add < 1)) {
             // no space for another single row, need flush now
             RETURN_IF_ERROR(_flush_segment_writer(segment_writer));
-            RETURN_IF_ERROR(_create_segment_writer(segment_writer, block));
+            RETURN_IF_ERROR(_create_segment_writer(segment_writer, flush_ctx));
             max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
             DCHECK(max_row_add > 0);
         }
@@ -462,13 +466,14 @@ Status BetaRowsetWriter::flush() {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, int64* flush_size) {
+Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, int64* flush_size,
+                                               const FlushContext* ctx) {
     if (block->rows() == 0) {
         return Status::OK();
     }
 
     std::unique_ptr<segment_v2::SegmentWriter> writer;
-    RETURN_IF_ERROR(_create_segment_writer(&writer, block));
+    RETURN_IF_ERROR(_create_segment_writer(&writer, ctx));
     RETURN_IF_ERROR(_add_block(block, &writer));
     RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
     RETURN_IF_ERROR(_segcompaction_if_necessary());
@@ -666,7 +671,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {
 
 Status BetaRowsetWriter::_do_create_segment_writer(
         std::unique_ptr<segment_v2::SegmentWriter>* writer, bool is_segcompaction, int64_t begin,
-        int64_t end, const vectorized::Block* block) {
+        int64_t end, const FlushContext* flush_ctx) {
     std::string path;
     int32_t segment_id = 0;
     if (is_segcompaction) {
@@ -715,7 +720,7 @@ Status BetaRowsetWriter::_do_create_segment_writer(
             std::lock_guard<SpinLock> l(_lock);
             _file_writers.push_back(std::move(file_writer));
         }
-        auto s = (*writer)->init(block);
+        auto s = (*writer)->init(flush_ctx);
         if (!s.ok()) {
             LOG(WARNING) << "failed to init segment writer: " << s.to_string();
             writer->reset(nullptr);
@@ -727,7 +732,7 @@ Status BetaRowsetWriter::_do_create_segment_writer(
 }
 
 Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
-                                                const vectorized::Block* block) {
+                                                const FlushContext* flush_ctx) {
     size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
     if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
         LOG(WARNING) << "too many segments in rowset."
@@ -738,7 +743,7 @@ Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::Segm
                      << " _num_segcompacted:" << _num_segcompacted;
         return Status::Error<TOO_MANY_SEGMENTS>();
     } else {
-        return _do_create_segment_writer(writer, false, -1, -1, block);
+        return _do_create_segment_writer(writer, false, -1, -1, flush_ctx);
     }
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index b7d5b10ee6..389c1bbede 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -79,7 +79,8 @@ public:
 
     // Return the file size flushed to disk in "flush_size"
     // This method is thread-safe.
-    Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) override;
+    Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size,
+                                 const FlushContext* ctx = nullptr) override;
 
     RowsetSharedPtr build() override;
 
@@ -106,7 +107,7 @@ public:
     int32_t get_atomic_num_segment() const override { return _num_segment.load(); }
 
     // Maybe modified by local schema change
-    vectorized::schema_util::LocalSchemaChangeRecorder* mutable_schema_change_recorder() {
+    vectorized::schema_util::LocalSchemaChangeRecorder* mutable_schema_change_recorder() override {
         return _context.schema_change_recorder.get();
     }
 
@@ -122,13 +123,14 @@ public:
 
 private:
     Status _add_block(const vectorized::Block* block,
-                      std::unique_ptr<segment_v2::SegmentWriter>* writer);
+                      std::unique_ptr<segment_v2::SegmentWriter>* writer,
+                      const FlushContext* flush_ctx = nullptr);
 
     Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
                                      bool is_segcompaction, int64_t begin, int64_t end,
-                                     const vectorized::Block* block = nullptr);
+                                     const FlushContext* ctx = nullptr);
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
-                                  const vectorized::Block* block = nullptr);
+                                  const FlushContext* ctx = nullptr);
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
                                  int64_t* flush_size = nullptr);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 1bdc4daa2d..6bfa9f6e9a 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -20,16 +20,25 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/types.pb.h>
 
+#include "common/factory_creator.h"
 #include "gutil/macros.h"
 #include "olap/column_mapping.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet_schema.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
 class MemTable;
 
+// Context for single memtable flush
+struct FlushContext {
+    ENABLE_FACTORY_CREATOR(FlushContext);
+    TabletSchemaSPtr flush_schema = nullptr;
+    const vectorized::Block* block = nullptr;
+};
+
 class RowsetWriter {
 public:
     RowsetWriter() = default;
@@ -59,7 +68,8 @@ public:
     }
     virtual Status final_flush() { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }
 
-    virtual Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) {
+    virtual Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size,
+                                         const FlushContext* ctx = nullptr) {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
     }
 
@@ -95,6 +105,9 @@ public:
 
     virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not supported!"; }
 
+    virtual vectorized::schema_util::LocalSchemaChangeRecorder*
+    mutable_schema_change_recorder() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
 };
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 3feb778e4a..f2e52736db 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -35,12 +35,15 @@
 #include "olap/key_coder.h"
 #include "olap/olap_common.h"
 #include "olap/primary_key_index.h"
-#include "olap/row_cursor.h"                      // IWYU pragma: keep
+#include "olap/row_cursor.h" // IWYU pragma: keep
+#include "olap/row_cursor.h" // RowCursor
+#include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"    // RowsetWriterContext
 #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
 #include "olap/rowset/segment_v2/page_io.h"
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "olap/short_key_index.h"
+#include "olap/tablet_schema.h"
 #include "runtime/memory/mem_tracker.h"
 #include "service/point_query_executor.h"
 #include "util/coding.h"
@@ -113,30 +116,20 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
     }
 }
 
-Status SegmentWriter::init(const vectorized::Block* block) {
+Status SegmentWriter::init(const FlushContext* flush_ctx) {
     std::vector<uint32_t> column_ids;
     int column_cnt = _tablet_schema->num_columns();
-    if (block && !_tablet_schema->is_partial_update()) {
-        // partial update only contain several columns
-        column_cnt = block->columns();
+    if (flush_ctx && flush_ctx->flush_schema) {
+        column_cnt = flush_ctx->flush_schema->num_columns();
     }
     for (uint32_t i = 0; i < column_cnt; ++i) {
         column_ids.emplace_back(i);
     }
-    return init(column_ids, true, block);
-}
-
-// Dynamic table with extended columns and directly write from delta writer
-// Compaction/SchemaChange path will use the latest schema version of rowset
-// as it's shcema, so it's block is not from dynamic table load procedure.
-// If it is a dynamic table load procedure we should handle auto generated columns.
-bool SegmentWriter::_should_create_writers_with_dynamic_block(size_t num_columns_in_block) {
-    return _tablet_schema->is_dynamic_schema() && _opts.is_direct_write &&
-           num_columns_in_block > _tablet_schema->columns().size();
+    return init(column_ids, true, flush_ctx);
 }
 
 Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
-                           const vectorized::Block* block) {
+                           const FlushContext* flush_ctx) {
     DCHECK(_column_writers.empty());
     DCHECK(_column_ids.empty());
     _has_key = has_key;
@@ -144,7 +137,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
     _column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end());
     _olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
     _opts.compression_type =
-            (block == nullptr || block->bytes() > config::segment_compression_threshold_kb * 1024)
+            (flush_ctx == nullptr || flush_ctx->block == nullptr ||
+             flush_ctx->block->bytes() > config::segment_compression_threshold_kb * 1024)
                     ? _tablet_schema->compression_type()
                     : NO_COMPRESSION;
     auto create_column_writer = [&](uint32_t cid, const auto& column) -> auto {
@@ -240,10 +234,10 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
         return Status::OK();
     };
 
-    if (block && _should_create_writers_with_dynamic_block(block->columns())) {
-        RETURN_IF_ERROR(_create_writers_with_dynamic_block(block, create_column_writer));
+    if (flush_ctx && flush_ctx->flush_schema) {
+        RETURN_IF_ERROR(_create_writers(*flush_ctx->flush_schema, col_ids, create_column_writer));
     } else {
-        RETURN_IF_ERROR(_create_writers(create_column_writer));
+        RETURN_IF_ERROR(_create_writers(*_tablet_schema, col_ids, create_column_writer));
     }
 
     // we don't need the short key index for unique key merge on write table.
@@ -266,41 +260,11 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
 }
 
 Status SegmentWriter::_create_writers(
+        const TabletSchema& tablet_schema, const std::vector<uint32_t>& col_ids,
         std::function<Status(uint32_t, const TabletColumn&)> create_column_writer) {
-    _olap_data_convertor->reserve(_column_ids.size());
-    for (auto& cid : _column_ids) {
-        RETURN_IF_ERROR(create_column_writer(cid, _tablet_schema->column(cid)));
-    }
-    return Status::OK();
-}
-
-// Dynamic Block consists of two parts, dynamic part of columns and static part of columns
-//  static   dynamic
-// | ----- | ------- |
-// the static ones are original _tablet_schame columns
-// the dynamic ones are auto generated and extended from file scan
-Status SegmentWriter::_create_writers_with_dynamic_block(
-        const vectorized::Block* block,
-        std::function<Status(uint32_t, const TabletColumn&)> create_column_writer) {
-    // generate writers from schema and extended schema info
-    _olap_data_convertor->reserve(block->columns());
-    // new columns added, query column info from Master
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    CHECK(block->columns() > _tablet_schema->num_columns());
-    schema_view.table_id = _tablet_schema->table_id();
-    RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
-    // create writers with static columns
-    for (size_t i = 0; i < _tablet_schema->columns().size(); ++i) {
-        create_column_writer(i, _tablet_schema->column(i));
-    }
-    // create writers with auto generated columns
-    for (size_t i = _tablet_schema->columns().size(); i < block->columns(); ++i) {
-        const auto& column_type_name = block->get_by_position(i);
-        const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name];
-        TabletColumn new_column(tcolumn);
-        RETURN_IF_ERROR(create_column_writer(i, new_column));
-        _opts.rowset_ctx->schema_change_recorder->add_extended_columns(new_column,
-                                                                       schema_view.schema_version);
+    _olap_data_convertor->reserve(col_ids.size());
+    for (auto& cid : col_ids) {
+        RETURN_IF_ERROR(create_column_writer(cid, tablet_schema.column(cid)));
     }
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index ea97958825..09a3576cf5 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -35,6 +35,7 @@
 #include "gutil/macros.h"
 #include "gutil/strings/substitute.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/segment_v2/column_writer.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
@@ -57,6 +58,7 @@ class ShortKeyIndexBuilder;
 class PrimaryKeyIndexBuilder;
 class KeyCoder;
 struct RowsetWriterContext;
+struct FlushContext;
 
 namespace io {
 class FileWriter;
@@ -89,11 +91,11 @@ public:
                            std::shared_ptr<MowContext> mow_context);
     ~SegmentWriter();
 
-    Status init(const vectorized::Block* block = nullptr);
+    Status init(const FlushContext* flush_ctx = nullptr);
 
     // for vertical compaction
     Status init(const std::vector<uint32_t>& col_ids, bool has_key,
-                const vectorized::Block* block = nullptr);
+                const FlushContext* flush_ctx = nullptr);
 
     template <typename RowType>
     Status append_row(const RowType& row);
@@ -133,11 +135,9 @@ public:
                                 const std::vector<bool>& use_default_flag, bool has_default);
 
 private:
-    Status _create_writers_with_dynamic_block(
-            const vectorized::Block* block,
-            std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
-    Status _create_writers(std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
+    Status _create_writers(const TabletSchema& tablet_schema, const std::vector<uint32_t>& col_ids,
+                           std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
     Status _write_data();
     Status _write_ordinal_index();
     Status _write_zone_map();
diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp
index 89aabbab94..7830e3fe8f 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -29,6 +29,8 @@
 #include <map>
 #include <optional>
 
+#include "common/exception.h"
+#include "common/status.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/columns_number.h"
@@ -158,7 +160,6 @@ private:
 /// Returns 0 for scalar fields.
 class FieldVisitorToNumberOfDimensions : public StaticVisitor<size_t> {
 public:
-    explicit FieldVisitorToNumberOfDimensions(Status* st) : _st(st) {}
     size_t operator()(const Array& x) const {
         const size_t size = x.size();
         std::optional<size_t> dimensions;
@@ -172,8 +173,8 @@ public:
             if (!dimensions) {
                 dimensions = current_dimensions;
             } else if (current_dimensions != *dimensions) {
-                *_st = Status::InvalidArgument(
-                        "Number of dimensions mismatched among array elements");
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Number of dimensions mismatched among array elements");
                 return 0;
             }
         }
@@ -183,9 +184,6 @@ public:
     size_t operator()(const T&) const {
         return 0;
     }
-
-private:
-    mutable Status* _st;
 };
 
 /// Visitor that allows to get type of scalar field
@@ -217,9 +215,16 @@ public:
         return 0;
     }
     size_t operator()(const Int64& x) {
-        // Only Int64 at present
+        // // Only Int64 | Int32 at present
+        // field_types.insert(FieldType::Int64);
+        // type_indexes.insert(TypeIndex::Int64);
+        // return 0;
         field_types.insert(FieldType::Int64);
-        type_indexes.insert(TypeIndex::Int64);
+        if (x <= std::numeric_limits<Int32>::max() && x >= std::numeric_limits<Int32>::min()) {
+            type_indexes.insert(TypeIndex::Int32);
+        } else {
+            type_indexes.insert(TypeIndex::Int64);
+        }
         return 0;
     }
     size_t operator()(const Null&) {
@@ -233,8 +238,8 @@ public:
         type_indexes.insert(TypeId<NearestFieldType<T>>::value);
         return 0;
     }
-    Status get_scalar_type(DataTypePtr* type) const {
-        return get_least_supertype(type_indexes, type, true /*compatible with string type*/);
+    void get_scalar_type(DataTypePtr* type) const {
+        get_least_supertype(type_indexes, type, true /*compatible with string type*/);
     }
     bool contain_nulls() const { return have_nulls; }
     bool need_convert_field() const { return field_types.size() > 1; }
@@ -246,21 +251,18 @@ private:
 };
 
 } // namespace
-Status get_field_info(const Field& field, FieldInfo* info) {
+void get_field_info(const Field& field, FieldInfo* info) {
     FieldVisitorToScalarType to_scalar_type_visitor;
     apply_visitor(to_scalar_type_visitor, field);
     DataTypePtr type = nullptr;
-    RETURN_IF_ERROR(to_scalar_type_visitor.get_scalar_type(&type));
+    to_scalar_type_visitor.get_scalar_type(&type);
     // array item's dimension may missmatch, eg. [1, 2, [1, 2, 3]]
-    Status num_to_dimensions_status;
     *info = {
             type,
             to_scalar_type_visitor.contain_nulls(),
             to_scalar_type_visitor.need_convert_field(),
-            apply_visitor(FieldVisitorToNumberOfDimensions(&num_to_dimensions_status), field),
+            apply_visitor(FieldVisitorToNumberOfDimensions(), field),
     };
-    RETURN_IF_ERROR(num_to_dimensions_status);
-    return Status::OK();
 }
 
 ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&& data_, bool is_nullable_)
@@ -297,10 +299,10 @@ size_t ColumnObject::Subcolumn::Subcolumn::allocatedBytes() const {
     return res;
 }
 
-Status ColumnObject::Subcolumn::insert(Field field) {
+void ColumnObject::Subcolumn::insert(Field field) {
     FieldInfo info;
-    RETURN_IF_ERROR(get_field_info(field, &info));
-    return insert(std::move(field), std::move(info));
+    get_field_info(field, &info);
+    insert(std::move(field), std::move(info));
 }
 
 void ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) {
@@ -308,11 +310,11 @@ void ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) {
     least_common_type = LeastCommonType {std::move(type)};
 }
 
-Status ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
+void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
     auto base_type = std::move(info.scalar_type);
     if (is_nothing(base_type)) {
         insertDefault();
-        return Status::OK();
+        return;
     }
     auto column_dim = least_common_type.get_dimensions();
     auto value_dim = info.num_dimensions;
@@ -323,8 +325,10 @@ Status ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
         value_dim = column_dim;
     }
     if (value_dim != column_dim) {
-        return Status::InvalidArgument(
-                "Dimension of types mismatched between inserted value and column.");
+        throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                               "Dimension of types mismatched between inserted value and column, "
+                               "expected:{}, but meet:{} for type:{}",
+                               column_dim, value_dim, least_common_type.get()->get_name());
     }
     if (is_nullable && !is_nothing(base_type)) {
         base_type = make_nullable(base_type);
@@ -348,9 +352,8 @@ Status ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
     } else if (!least_common_base_type->equals(*base_type) && !is_nothing(base_type)) {
         if (!schema_util::is_conversion_required_between_integers(*base_type,
                                                                   *least_common_base_type)) {
-            RETURN_IF_ERROR(
-                    get_least_supertype(DataTypes {std::move(base_type), least_common_base_type},
-                                        &base_type, true /*compatible with string type*/));
+            get_least_supertype(DataTypes {std::move(base_type), least_common_base_type},
+                                &base_type, true /*compatible with string type*/);
             type_changed = true;
             if (!least_common_base_type->equals(*base_type)) {
                 add_new_column_part(create_array_of_type(std::move(base_type), value_dim));
@@ -360,15 +363,14 @@ Status ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
 
     if (type_changed || info.need_convert) {
         Field new_field;
-        RETURN_IF_ERROR(convert_field_to_type(field, *least_common_type.get(), &new_field));
+        convert_field_to_type(field, *least_common_type.get(), &new_field);
         field = new_field;
     }
 
     data.back()->insert(field);
-    return Status::OK();
 }
 
-Status ColumnObject::Subcolumn::insertRangeFrom(const Subcolumn& src, size_t start, size_t length) {
+void ColumnObject::Subcolumn::insertRangeFrom(const Subcolumn& src, size_t start, size_t length) {
     assert(src.is_finalized());
     const auto& src_column = src.data.back();
     const auto& src_type = src.least_common_type.get();
@@ -379,18 +381,20 @@ Status ColumnObject::Subcolumn::insertRangeFrom(const Subcolumn& src, size_t sta
         data.back()->insert_range_from(*src_column, start, length);
     } else {
         DataTypePtr new_least_common_type = nullptr;
-        RETURN_IF_ERROR(get_least_supertype(DataTypes {least_common_type.get(), src_type},
-                                            &new_least_common_type,
-                                            true /*compatible with string type*/));
+        get_least_supertype(DataTypes {least_common_type.get(), src_type}, &new_least_common_type,
+                            true /*compatible with string type*/);
         ColumnPtr casted_column;
-        RETURN_IF_ERROR(schema_util::cast_column({src_column, src_type, ""}, new_least_common_type,
-                                                 &casted_column));
+        Status st = schema_util::cast_column({src_column, src_type, ""}, new_least_common_type,
+                                             &casted_column);
+        if (!st.ok()) {
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string() + ", real_code:{}",
+                                   st.code());
+        }
         if (!least_common_type.get()->equals(*new_least_common_type)) {
             add_new_column_part(std::move(new_least_common_type));
         }
         data.back()->insert_range_from(*casted_column, start, length);
     }
-    return Status::OK();
 }
 
 bool ColumnObject::Subcolumn::is_finalized() const {
@@ -400,7 +404,9 @@ bool ColumnObject::Subcolumn::is_finalized() const {
 template <typename Func>
 ColumnPtr ColumnObject::apply_for_subcolumns(Func&& func, std::string_view func_name) const {
     if (!is_finalized()) {
-        LOG(FATAL) << "Cannot " << func_name << " non-finalized ColumnObject";
+        // LOG(FATAL) << "Cannot " << func_name << " non-finalized ColumnObject";
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "Cannot {} non-finalized ColumnObject", func_name);
     }
     auto res = ColumnObject::create(is_nullable);
     for (const auto& subcolumn : subcolumns) {
@@ -560,7 +566,11 @@ void ColumnObject::check_consistency() const {
     }
     for (const auto& leaf : subcolumns) {
         if (num_rows != leaf->data.size()) {
-            assert(false);
+            // LOG(FATAL) << "unmatched column:" << leaf->path.get_path()
+            //            << ", expeted rows:" << num_rows << ", but meet:" << leaf->data.size();
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "unmatched column: {}, expeted rows: {}, but meet: {}",
+                                   leaf->path.get_path(), num_rows, leaf->data.size());
         }
     }
 }
@@ -575,7 +585,8 @@ size_t ColumnObject::size() const {
 MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const {
     /// cloneResized with new_size == 0 is used for cloneEmpty().
     if (new_size != 0) {
-        LOG(FATAL) << "ColumnObject doesn't support resize to non-zero length";
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "ColumnObject doesn't support resize to non-zero length");
     }
     return ColumnObject::create(is_nullable);
 }
@@ -605,11 +616,11 @@ void ColumnObject::for_each_subcolumn(ColumnCallback callback) {
     }
 }
 
-Status ColumnObject::try_insert_from(const IColumn& src, size_t n) {
+void ColumnObject::try_insert_from(const IColumn& src, size_t n) {
     return try_insert(src[n]);
 }
 
-Status ColumnObject::try_insert(const Field& field) {
+void ColumnObject::try_insert(const Field& field) {
     const auto& object = field.get<const VariantMap&>();
     phmap::flat_hash_set<StringRef, StringRefHash> inserted;
     size_t old_size = size();
@@ -619,16 +630,16 @@ Status ColumnObject::try_insert(const Field& field) {
         if (!has_subcolumn(key)) {
             bool succ = add_sub_column(key, old_size);
             if (!succ) {
-                return Status::InvalidArgument(
-                        fmt::format("Failed to add sub column {}", key.get_path()));
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Failed to add sub column {}", key.get_path());
             }
         }
         auto* subcolumn = get_subcolumn(key);
         if (!subcolumn) {
-            return Status::InvalidArgument(
-                    fmt::format("Failed to find sub column {}", key.get_path()));
+            doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                             fmt::format("Failed to find sub column {}", key.get_path()));
         }
-        RETURN_IF_ERROR(subcolumn->insert(value));
+        subcolumn->insert(value);
     }
     for (auto& entry : subcolumns) {
         if (!inserted.contains(entry->path.get_path())) {
@@ -636,7 +647,6 @@ Status ColumnObject::try_insert(const Field& field) {
         }
     }
     ++num_rows;
-    return Status::OK();
 }
 
 void ColumnObject::insert_default() {
@@ -674,26 +684,26 @@ Status ColumnObject::try_insert_indices_from(const IColumn& src, const int* indi
         if (*x == -1) {
             ColumnObject::insert_default();
         } else {
-            RETURN_IF_ERROR(ColumnObject::try_insert_from(src, *x));
+            ColumnObject::try_insert_from(src, *x);
         }
     }
     finalize();
     return Status::OK();
 }
 
-Status ColumnObject::try_insert_range_from(const IColumn& src, size_t start, size_t length) {
+void ColumnObject::try_insert_range_from(const IColumn& src, size_t start, size_t length) {
     const auto& src_object = assert_cast<const ColumnObject&>(src);
     if (UNLIKELY(src_object.empty())) {
-        return Status::OK();
+        return;
     }
     for (auto& entry : subcolumns) {
         if (src_object.has_subcolumn(entry->path)) {
             auto* subcolumn = src_object.get_subcolumn(entry->path);
             if (!subcolumn) {
-                return Status::InvalidArgument(
-                        fmt::format("Failed to find sub column {}", entry->path.get_path()));
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Failed to find sub column {}", entry->path.get_path());
             }
-            RETURN_IF_ERROR(entry->data.insertRangeFrom(*subcolumn, start, length));
+            entry->data.insertRangeFrom(*subcolumn, start, length);
         } else {
             entry->data.insertManyDefaults(length);
         }
@@ -714,20 +724,19 @@ Status ColumnObject::try_insert_range_from(const IColumn& src, size_t start, siz
                 succ = add_sub_column(entry->path, num_rows);
             }
             if (!succ) {
-                return Status::InvalidArgument(
-                        fmt::format("Failed to add column {}", entry->path.get_path()));
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Failed to add column {}", entry->path.get_path());
             }
             auto* subcolumn = get_subcolumn(entry->path);
             if (!subcolumn) {
-                return Status::InvalidArgument(
-                        fmt::format("Failed to find sub column {}", entry->path.get_path()));
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Failed to find sub column {}", entry->path.get_path());
             }
-            RETURN_IF_ERROR(subcolumn->insertRangeFrom(entry->data, start, length));
+            subcolumn->insertRangeFrom(entry->data, start, length);
         }
     }
     num_rows += length;
     finalize();
-    return Status::OK();
 }
 
 void ColumnObject::pop_back(size_t length) {
diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h
index 7fec48a0c8..339db8c6b9 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -66,7 +66,7 @@ struct FieldInfo {
     /// Number of dimension in array. 0 if field is scalar.
     size_t num_dimensions;
 };
-Status get_field_info(const Field& field, FieldInfo* info);
+void get_field_info(const Field& field, FieldInfo* info);
 /** A column that represents object with dynamic set of subcolumns.
  *  Subcolumns are identified by paths in document and are stored in
  *  a trie-like structure. ColumnObject is not suitable for writing into tables
@@ -109,16 +109,16 @@ public:
 
         /// Inserts a field, which scalars can be arbitrary, but number of
         /// dimensions should be consistent with current common type.
-        /// return Status::InvalidArgument when meet conflict types
-        Status insert(Field field);
+        /// throws InvalidArgument when meet conflict types
+        void insert(Field field);
 
-        Status insert(Field field, FieldInfo info);
+        void insert(Field field, FieldInfo info);
 
         void insertDefault();
 
         void insertManyDefaults(size_t length);
 
-        Status insertRangeFrom(const Subcolumn& src, size_t start, size_t length);
+        void insertRangeFrom(const Subcolumn& src, size_t start, size_t length);
 
         void pop_back(size_t n);
 
@@ -279,12 +279,7 @@ public:
     void for_each_subcolumn(ColumnCallback callback) override;
 
     // Do nothing, call try_insert instead
-    void insert(const Field& field) override {
-        Status st = try_insert(field);
-        if (!st.ok()) {
-            LOG(FATAL) << "insert return ERROR status: " << st;
-        }
-    }
+    void insert(const Field& field) override { try_insert(field); }
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) override;
 
@@ -294,13 +289,12 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
-    // Only called in Block::add_row
-    Status try_insert(const Field& field);
+    // May throw execption
+    void try_insert(const Field& field);
 
-    Status try_insert_from(const IColumn& src, size_t n);
+    void try_insert_from(const IColumn& src, size_t n);
 
-    // Only called in Block::add_row
-    Status try_insert_range_from(const IColumn& src, size_t start, size_t length);
+    void try_insert_range_from(const IColumn& src, size_t start, size_t length);
 
     void insert_default() override;
 
diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp
index 7bcfacb1a0..798722d1b0 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -35,6 +35,7 @@
 #include <vector>
 
 #include "common/config.h"
+#include "common/status.h"
 #include "olap/olap_common.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
@@ -273,11 +274,11 @@ Status send_add_columns_rpc(ColumnsWithTypeAndName column_type_names,
     return Status::OK();
 }
 
-void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type) {
+Status unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type) {
     auto dynamic_col = block.get_by_position(dynamic_col_position).column->assume_mutable();
     auto* column_object_ptr = assert_cast<ColumnObject*>(dynamic_col.get());
     if (column_object_ptr->empty()) {
-        return;
+        return Status::OK();
     }
     size_t num_rows = column_object_ptr->size();
     CHECK(block.rows() <= num_rows);
@@ -308,7 +309,8 @@ void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_origi
             }
             if (cast_to_original_type && !dst_type->equals(*types[i])) {
                 // Cast static columns to original slot type
-                schema_util::cast_column({subcolumns[i], types[i], ""}, dst_type, &column);
+                RETURN_IF_ERROR(
+                        schema_util::cast_column({subcolumns[i], types[i], ""}, dst_type, &column));
             }
             // replace original column
             column_type_name->column = column;
@@ -326,6 +328,16 @@ void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_origi
             entry.column->assume_mutable()->insert_many_defaults(num_rows - entry.column->size());
         }
     }
+#ifndef NDEBUG
+    for (const auto& column_type_name : block) {
+        if (column_type_name.column->size() != num_rows) {
+            LOG(FATAL) << "unmatched column:" << column_type_name.name
+                       << ", expeted rows:" << num_rows
+                       << ", but meet:" << column_type_name.column->size();
+        }
+    }
+#endif
+    return Status::OK();
 }
 
 void LocalSchemaChangeRecorder::add_extended_columns(const TabletColumn& new_column,
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 978eaa19c7..fda6a69cf4 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -65,7 +65,7 @@ Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, Co
 // from object column and casted to the new type from slot_descs.
 // Also if column in block is empty, it will be filled
 // with num_rows of default values
-void unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type);
+Status unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_original_type);
 
 /// If both of types are signed/unsigned integers and size of left field type
 /// is less than right type, we don't need to convert field,
diff --git a/be/src/vec/data_types/convert_field_to_type.cpp b/be/src/vec/data_types/convert_field_to_type.cpp
index bc7a821e64..a2a1377050 100644
--- a/be/src/vec/data_types/convert_field_to_type.cpp
+++ b/be/src/vec/data_types/convert_field_to_type.cpp
@@ -30,6 +30,8 @@
 #include <type_traits>
 #include <vector>
 
+#include "common/exception.h"
+#include "common/status.h"
 #include "vec/common/field_visitors.h"
 #include "vec/common/typeid_cast.h"
 #include "vec/core/accurate_comparison.h"
@@ -94,8 +96,9 @@ Field convert_numeric_type_impl(const Field& from) {
     }
     return result;
 }
+
 template <typename To>
-Status convert_numric_type(const Field& from, const IDataType& type, Field* to) {
+void convert_numric_type(const Field& from, const IDataType& type, Field* to) {
     if (from.get_type() == Field::Types::UInt64) {
         *to = convert_numeric_type_impl<UInt64, To>(from);
     } else if (from.get_type() == Field::Types::Int64) {
@@ -107,18 +110,17 @@ Status convert_numric_type(const Field& from, const IDataType& type, Field* to)
     } else if (from.get_type() == Field::Types::Int128) {
         *to = convert_numeric_type_impl<Int128, To>(from);
     } else {
-        return Status::InvalidArgument(
-                fmt::format("Type mismatch in IN or VALUES section. Expected: {}. Got: {}",
-                            type.get_name(), from.get_type()));
+        throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                               "Type mismatch in IN or VALUES section. Expected: {}. Got: {}",
+                               type.get_name(), from.get_type());
     }
-    return Status::OK();
 }
 
-Status convert_field_to_typeImpl(const Field& src, const IDataType& type,
-                                 const IDataType* from_type_hint, Field* to) {
+void convert_field_to_typeImpl(const Field& src, const IDataType& type,
+                               const IDataType* from_type_hint, Field* to) {
     if (from_type_hint && from_type_hint->equals(type)) {
         *to = src;
-        return Status::OK();
+        return;
     }
     WhichDataType which_type(type);
     // TODO add more types
@@ -163,16 +165,16 @@ Status convert_field_to_typeImpl(const Field& src, const IDataType& type,
             src.get_type() == Field::Types::UInt64) {
             /// We don't need any conversion UInt64 is under type of Date and DateTime
             *to = src;
-            return Status::OK();
+            return;
         }
     } else if (which_type.is_string_or_fixed_string()) {
         if (src.get_type() == Field::Types::String) {
             *to = src;
-            return Status::OK();
+            return;
         }
         // TODO this is a very simple translator, support more complex types
         *to = apply_visitor(FieldVisitorToStringSimple(), src);
-        return Status::OK();
+        return;
     } else if (const DataTypeArray* type_array = typeid_cast<const DataTypeArray*>(&type)) {
         if (src.get_type() == Field::Types::Array) {
             const Array& src_arr = src.get<Array>();
@@ -180,14 +182,14 @@ Status convert_field_to_typeImpl(const Field& src, const IDataType& type,
             const auto& element_type = *(type_array->get_nested_type());
             Array res(src_arr_size);
             for (size_t i = 0; i < src_arr_size; ++i) {
-                RETURN_IF_ERROR(convert_field_to_type(src_arr[i], element_type, &res[i]));
+                convert_field_to_type(src_arr[i], element_type, &res[i]);
                 if (res[i].is_null() && !element_type.is_nullable()) {
-                    return Status::InvalidArgument(
-                            fmt::format("Cannot convert NULL to {}", element_type.get_name()));
+                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Cannot convert NULL to {}",
+                                           element_type.get_name());
                 }
             }
             *to = Field(res);
-            return Status::OK();
+            return;
         }
     }
     // else if (const DataTypeTuple* type_tuple = typeid_cast<const DataTypeTuple*>(&type)) {
@@ -229,31 +231,31 @@ Status convert_field_to_typeImpl(const Field& src, const IDataType& type,
     //         return Status::OK();
     //     }
     // }
-    return Status::InvalidArgument(
-            fmt::format("Type mismatch in IN or VALUES section. Expected: {}. Got: {}",
-                        type.get_name(), src.get_type()));
+    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                           "Type mismatch in IN or VALUES section. Expected: {}. Got: {}",
+                           type.get_name(), src.get_type());
 }
 } // namespace
-Status convert_field_to_type(const Field& from_value, const IDataType& to_type, Field* to,
-                             const IDataType* from_type_hint) {
+void convert_field_to_type(const Field& from_value, const IDataType& to_type, Field* to,
+                           const IDataType* from_type_hint) {
     if (from_value.is_null()) {
         *to = from_value;
-        return Status::OK();
+        return;
     }
     if (from_type_hint && from_type_hint->equals(to_type)) {
         *to = from_value;
-        return Status::OK();
+        return;
     }
     if (const auto* nullable_type = typeid_cast<const DataTypeNullable*>(&to_type)) {
         const IDataType& nested_type = *nullable_type->get_nested_type();
         /// NULL remains NULL after any conversion.
         if (WhichDataType(nested_type).is_nothing()) {
             *to = {};
-            return Status::OK();
+            return;
         }
         if (from_type_hint && from_type_hint->equals(nested_type)) {
             *to = from_value;
-            return Status::OK();
+            return;
         }
         return convert_field_to_typeImpl(from_value, nested_type, from_type_hint, to);
     } else {
diff --git a/be/src/vec/data_types/convert_field_to_type.h b/be/src/vec/data_types/convert_field_to_type.h
index 8d57994c31..07d48452a4 100644
--- a/be/src/vec/data_types/convert_field_to_type.h
+++ b/be/src/vec/data_types/convert_field_to_type.h
@@ -33,7 +33,7 @@ class IDataType;
   * If the value does not fall into the range - returns Null.
   */
 
-Status convert_field_to_type(const Field& from_value, const IDataType& to_type, Field* field,
-                             const IDataType* from_type_hint = nullptr);
+void convert_field_to_type(const Field& from_value, const IDataType& to_type, Field* field,
+                           const IDataType* from_type_hint = nullptr);
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/get_least_supertype.cpp b/be/src/vec/data_types/get_least_supertype.cpp
index 3d08a77443..be9dd5c05c 100644
--- a/be/src/vec/data_types/get_least_supertype.cpp
+++ b/be/src/vec/data_types/get_least_supertype.cpp
@@ -20,6 +20,7 @@
 
 #include "vec/data_types/get_least_supertype.h"
 
+#include <fmt/core.h>
 #include <fmt/format.h>
 #include <glog/logging.h>
 #include <stddef.h>
@@ -31,6 +32,7 @@
 #include <string>
 #include <vector>
 
+#include "common/status.h"
 #include "vec/common/typeid_cast.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
@@ -45,23 +47,36 @@ namespace doris::vectorized {
 
 namespace {
 
+String type_to_string(const DataTypePtr& type) {
+    return type->get_name();
+}
+String type_to_string(const TypeIndex& type) {
+    return getTypeName(type);
+}
+
+template <typename DataTypes>
 String get_exception_message_prefix(const DataTypes& types) {
     std::stringstream res;
     res << "There is no supertype for types ";
-
     bool first = true;
     for (const auto& type : types) {
         if (!first) res << ", ";
         first = false;
-
-        res << type->get_name();
+        res << type_to_string(type);
     }
-
     return res.str();
 }
+
 } // namespace
 
-Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
+void get_numeric_type(const TypeIndexSet& types, DataTypePtr* type, bool compatible_with_string) {
+    auto throw_or_return = [&](std::string_view message, int error_code) {
+        if (compatible_with_string) {
+            *type = std::make_shared<DataTypeString>();
+            return;
+        }
+        throw doris::Exception(error_code, message);
+    };
     bool all_numbers = true;
 
     size_t max_bits_of_signed_integer = 0;
@@ -104,9 +119,11 @@ Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
     if (max_bits_of_signed_integer || max_bits_of_unsigned_integer ||
         max_mantissa_bits_of_floating) {
         if (!all_numbers) {
-            LOG(INFO) << " because some of them are numbers and some of them are not";
             *type = nullptr;
-            return Status::InvalidArgument("some of them are numbers and some of them are not");
+            return throw_or_return(
+                    get_exception_message_prefix(types) +
+                            " because some of them are numbers and some of them are not",
+                    doris::ErrorCode::INVALID_ARGUMENT);
         }
 
         /// If there are signed and unsigned types of same bit-width, the result must be signed number with at least one more bit.
@@ -117,8 +134,9 @@ Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
 
         /// If unsigned is not covered by signed.
         if (max_bits_of_signed_integer &&
-            max_bits_of_unsigned_integer >= max_bits_of_signed_integer)
+            max_bits_of_unsigned_integer >= max_bits_of_signed_integer) {
             ++min_bit_width_of_integer;
+        }
 
         /// If the result must be floating.
         if (max_mantissa_bits_of_floating) {
@@ -126,18 +144,21 @@ Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
                     std::max(min_bit_width_of_integer, max_mantissa_bits_of_floating);
             if (min_mantissa_bits <= 24) {
                 *type = std::make_shared<DataTypeFloat32>();
-                return Status::OK();
+                return;
             } else if (min_mantissa_bits <= 53) {
                 *type = std::make_shared<DataTypeFloat64>();
-                return Status::OK();
+                return;
             } else {
                 LOG(INFO) << " because some of them are integers and some are floating point "
                              "but there is no floating point type, that can exactly represent "
                              "all required integers";
                 *type = nullptr;
-                return Status::InvalidArgument(
-                        "there is no floating point type, that can exactly represent "
-                        "all required integers");
+                return throw_or_return(
+                        get_exception_message_prefix(types) +
+                                " because some of them are integers and some are floating point "
+                                "but there is no floating point type, that can exactly represent "
+                                "all required integers",
+                        doris::ErrorCode::INVALID_ARGUMENT);
             }
         }
 
@@ -145,23 +166,25 @@ Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
         if (max_bits_of_signed_integer) {
             if (min_bit_width_of_integer <= 8) {
                 *type = std::make_shared<DataTypeInt8>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 16) {
                 *type = std::make_shared<DataTypeInt16>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 32) {
                 *type = std::make_shared<DataTypeInt32>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 64) {
                 *type = std::make_shared<DataTypeInt64>();
-                return Status::OK();
+                return;
             } else {
                 LOG(INFO) << " because some of them are signed integers and some are unsigned "
                              "integers, but there is no signed integer type, that can exactly "
                              "represent all required unsigned integer values";
-                return Status::InvalidArgument(
-                        "there is no signed integer type, that can exactly "
-                        "represent all required unsigned integer values");
+                return throw_or_return(
+                        " because some of them are signed integers and some are unsigned "
+                        "integers, but there is no signed integer type, that can exactly "
+                        "represent all required unsigned integer values",
+                        doris::ErrorCode::INVALID_ARGUMENT);
             }
         }
 
@@ -169,43 +192,50 @@ Status get_numeric_type(const TypeIndexSet& types, DataTypePtr* type) {
         {
             if (min_bit_width_of_integer <= 8) {
                 *type = std::make_shared<DataTypeUInt8>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 16) {
                 *type = std::make_shared<DataTypeUInt16>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 32) {
                 *type = std::make_shared<DataTypeUInt32>();
-                return Status::OK();
+                return;
             } else if (min_bit_width_of_integer <= 64) {
                 *type = std::make_shared<DataTypeUInt64>();
-                return Status::OK();
+                return;
             } else {
-                LOG(FATAL) << "Logical error: "
-                           << "but as all data types are unsigned integers, we must have found "
-                              "maximum unsigned integer type";
+                LOG(WARNING) << "Logical error: "
+                             << "but as all data types are unsigned integers, we must have found "
+                                "maximum unsigned integer type";
                 *type = nullptr;
-                return Status::InvalidArgument(
-                        "all data types are unsigned integers, we must have found "
-                        "maximum unsigned integer type");
+                return throw_or_return(
+                        "Logical error: " + get_exception_message_prefix(types) +
+                                " but as all data types are unsigned integers, we must have found "
+                                "maximum unsigned integer type",
+                        doris::ErrorCode::INVALID_ARGUMENT);
             }
         }
     }
     *type = nullptr;
-    return Status::OK();
 }
 
 // TODO conflict type resolve
-Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compatible_with_string) {
+void get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compatible_with_string) {
     /// Trivial cases
-
+    auto throw_or_return = [&](std::string_view message, int error_code) {
+        if (compatible_with_string) {
+            *type = std::make_shared<DataTypeString>();
+            return;
+        }
+        throw doris::Exception(error_code, String(message));
+    };
     if (types.empty()) {
         *type = std::make_shared<DataTypeNothing>();
-        return Status::OK();
+        return;
     }
 
     if (types.size() == 1) {
         *type = types[0];
-        return Status::OK();
+        return;
     }
 
     /// All types are equal
@@ -220,7 +250,7 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
 
         if (all_equal) {
             *type = types[0];
-            return Status::OK();
+            return;
         }
     }
 
@@ -231,12 +261,16 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
         DataTypes non_nothing_types;
         non_nothing_types.reserve(types.size());
 
-        for (const auto& type : types)
-            if (!typeid_cast<const DataTypeNothing*>(type.get()))
+        for (const auto& type : types) {
+            if (!typeid_cast<const DataTypeNothing*>(type.get())) {
                 non_nothing_types.emplace_back(type);
+            }
+        }
 
-        if (non_nothing_types.size() < types.size())
-            return get_least_supertype(non_nothing_types, type, compatible_with_string);
+        if (non_nothing_types.size() < types.size()) {
+            get_least_supertype(non_nothing_types, type, compatible_with_string);
+            return;
+        }
     }
 
     /// For Nullable
@@ -251,27 +285,28 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
                         typeid_cast<const DataTypeNullable*>(type.get())) {
                 have_nullable = true;
 
-                if (!type_nullable->only_null())
+                if (!type_nullable->only_null()) {
                     nested_types.emplace_back(type_nullable->get_nested_type());
-            } else
+                }
+            } else {
                 nested_types.emplace_back(type);
+            }
         }
 
         if (have_nullable) {
             DataTypePtr nested_type;
-            Status st = get_least_supertype(nested_types, &nested_type, compatible_with_string);
-            if (!st.ok()) {
-                return st;
-            }
+            get_least_supertype(nested_types, &nested_type, compatible_with_string);
             *type = std::make_shared<DataTypeNullable>(nested_type);
-            return st;
+            return;
         }
     }
 
     /// Non-recursive rules
 
     phmap::flat_hash_set<TypeIndex> type_ids;
-    for (const auto& type : types) type_ids.insert(type->get_type_id());
+    for (const auto& type : types) {
+        type_ids.insert(type->get_type_id());
+    }
 
     /// For String and FixedString, or for different FixedStrings, the common type is String.
     /// No other types are compatible with Strings. TODO Enums?
@@ -285,12 +320,14 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
                 LOG(INFO)
                         << get_exception_message_prefix(types)
                         << " because some of them are String/FixedString and some of them are not";
-                return Status::InvalidArgument(
-                        "some of them are String/FixedString and some of them are not");
+                return throw_or_return(get_exception_message_prefix(types) +
+                                               " because some of them are String/FixedString and "
+                                               "some of them are not",
+                                       doris::ErrorCode::INVALID_ARGUMENT);
             }
 
             *type = std::make_shared<DataTypeString>();
-            return Status::OK();
+            return;
         }
     }
 
@@ -304,12 +341,14 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
             if (!all_date_or_datetime) {
                 LOG(INFO) << get_exception_message_prefix(types)
                           << " because some of them are Date/DateTime and some of them are not";
-                return Status::InvalidArgument(
-                        "because some of them are Date/DateTime and some of them are not");
+                return throw_or_return(
+                        get_exception_message_prefix(types) +
+                                " because some of them are Date/DateTime and some of them are not",
+                        doris::ErrorCode::INVALID_ARGUMENT);
             }
 
             *type = std::make_shared<DataTypeDateTime>();
-            return Status::OK();
+            return;
         }
     }
 
@@ -340,8 +379,10 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
             if (num_supported != type_ids.size()) {
                 LOG(INFO) << get_exception_message_prefix(types)
                           << " because some of them have no lossless convertion to Decimal";
-                return Status::InvalidArgument(
-                        "some of them have no lossless convertion to Decimal");
+                return throw_or_return(
+                        get_exception_message_prefix(types) +
+                                " because some of them have no lossless convertion to Decimal",
+                        doris::ErrorCode::INVALID_ARGUMENT);
             }
 
             UInt32 max_scale = 0;
@@ -364,50 +405,58 @@ Status get_least_supertype(const DataTypes& types, DataTypePtr* type, bool compa
                 LOG(INFO) << fmt::format("{} because the least supertype is Decimal({},{})",
                                          get_exception_message_prefix(types), min_precision,
                                          max_scale);
-                return Status::InvalidArgument(
-                        fmt::format("{} because the least supertype is Decimal({},{})",
-                                    get_exception_message_prefix(types), min_precision, max_scale));
+                return throw_or_return(get_exception_message_prefix(types) +
+                                               fmt::format(" because some of them have no lossless "
+                                                           "convertion to Decimal({},{})",
+                                                           min_precision, max_scale),
+                                       doris::ErrorCode::INVALID_ARGUMENT);
             }
 
             if (have_decimal128 || min_precision > DataTypeDecimal<Decimal64>::max_precision()) {
                 *type = std::make_shared<DataTypeDecimal<Decimal128>>(
                         DataTypeDecimal<Decimal128>::max_precision(), max_scale);
-                return Status::OK();
+                return;
             }
             if (have_decimal128i || min_precision > DataTypeDecimal<Decimal64>::max_precision()) {
                 *type = std::make_shared<DataTypeDecimal<Decimal128I>>(
                         DataTypeDecimal<Decimal128I>::max_precision(), max_scale);
-                return Status::OK();
+                return;
             }
             if (have_decimal64 || min_precision > DataTypeDecimal<Decimal32>::max_precision()) {
                 *type = std::make_shared<DataTypeDecimal<Decimal64>>(
                         DataTypeDecimal<Decimal64>::max_precision(), max_scale);
-                return Status::OK();
+                return;
             }
             *type = std::make_shared<DataTypeDecimal<Decimal32>>(
                     DataTypeDecimal<Decimal32>::max_precision(), max_scale);
-            return Status::OK();
+            return;
         }
     }
 
     /// For numeric types, the most complicated part.
     {
         DataTypePtr numeric_type = nullptr;
-        Status st = get_numeric_type(type_ids, &numeric_type);
+        get_numeric_type(type_ids, &numeric_type, compatible_with_string);
         if (numeric_type) {
-            DCHECK(st.ok());
             *type = numeric_type;
-            return Status::OK();
+            return;
         }
     }
 
     /// All other data types (UUID, AggregateFunction, Enum...) are compatible only if they are the same (checked in trivial cases).
     *type = nullptr;
-    return Status::InvalidArgument(get_exception_message_prefix(types));
+    return throw_or_return(get_exception_message_prefix(types), ErrorCode::INVALID_ARGUMENT);
 }
 
-Status get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
-                           bool compatible_with_string) {
+void get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
+                         bool compatible_with_string) {
+    auto throw_or_return = [&](std::string_view message, int error_code) {
+        if (compatible_with_string) {
+            *type = std::make_shared<DataTypeString>();
+            return;
+        }
+        throw doris::Exception(error_code, String(message));
+    };
     TypeIndexSet types_set;
     for (const auto& t : types) {
         if (WhichDataType(t).is_nothing()) continue;
@@ -416,8 +465,9 @@ Status get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
             LOG(INFO) << "Cannot get common type by type ids with parametric type"
                       << getTypeName(t);
             *type = nullptr;
-            return Status::InvalidArgument(
-                    "Cannot get common type by type ids with parametric type");
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                   "Cannot get common type by type ids with parametric type {}",
+                                   type_to_string(t));
         }
 
         types_set.insert(t);
@@ -425,31 +475,33 @@ Status get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
 
     if (types_set.empty()) {
         *type = std::make_shared<DataTypeNothing>();
-        return Status::OK();
+        return;
     }
 
     if (types.count(TypeIndex::String)) {
-        if (types.size() != 1 && !compatible_with_string) {
+        if (types.size() != 1) {
             LOG(INFO) << " because some of them are String and some of them are not";
             *type = nullptr;
-            return Status::InvalidArgument("some of them are String and some of them are not");
+            return throw_or_return(
+                    get_exception_message_prefix(types) +
+                            " because some of them are String and some of them are not",
+                    ErrorCode::INVALID_ARGUMENT);
         }
 
         *type = std::make_shared<DataTypeString>();
-        return Status::OK();
+        return;
     }
 
     /// For numeric types, the most complicated part.
     DataTypePtr numeric_type = nullptr;
-    Status st = get_numeric_type(types, &numeric_type);
+    get_numeric_type(types, &numeric_type, compatible_with_string);
     if (numeric_type) {
-        DCHECK(st.ok());
         *type = numeric_type;
-        return Status::OK();
+        return;
     }
     /// All other data types (UUID, AggregateFunction, Enum...) are compatible only if they are the same (checked in trivial cases).
     *type = nullptr;
-    return Status::InvalidArgument("unknown type");
+    return throw_or_return(get_exception_message_prefix(types), ErrorCode::INVALID_ARGUMENT);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/get_least_supertype.h b/be/src/vec/data_types/get_least_supertype.h
index ad52f553df..72a6fc31f2 100644
--- a/be/src/vec/data_types/get_least_supertype.h
+++ b/be/src/vec/data_types/get_least_supertype.h
@@ -42,10 +42,10 @@ namespace doris::vectorized {
 
 using TypeIndexSet = phmap::flat_hash_set<TypeIndex>;
 
-Status get_least_supertype(const DataTypes& types, DataTypePtr* type,
-                           bool compatible_with_string = false);
+void get_least_supertype(const DataTypes& types, DataTypePtr* type,
+                         bool compatible_with_string = false);
 
-Status get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
-                           bool compatible_with_string = false);
+void get_least_supertype(const TypeIndexSet& types, DataTypePtr* type,
+                         bool compatible_with_string = false);
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index daa549776b..5925515200 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -39,6 +39,7 @@
 
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/status.h"
 #include "exprs/json_functions.h"
 #include "io/file_factory.h"
 #include "io/fs/buffered_reader.h"
@@ -460,8 +461,7 @@ Status NewJsonReader::_parse_dynamic_json(bool* is_empty_row, bool* eof, Block&
     _bytes_read_counter += size;
     auto& dynamic_column = block.get_columns().back()->assume_mutable_ref();
     auto& column_object = assert_cast<vectorized::ColumnObject&>(dynamic_column);
-    Defer __finalize_clousure([&] {
-        // Reached buffer size, unfold intermediate column object
+    auto finalize_column = [&]() -> Status {
         size_t batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
         if (column_object.size() >= batch_size || _reader_eof) {
             column_object.finalize();
@@ -470,35 +470,22 @@ Status NewJsonReader::_parse_dynamic_json(bool* is_empty_row, bool* eof, Block&
             }
             // Unfold object columns for the purpose of extracting static columns and
             // fill default values missing in static columns
-            schema_util::unfold_object(block.columns() - 1, block,
-                                       true /*cast to original column type*/);
+            RETURN_IF_ERROR(schema_util::unfold_object(block.columns() - 1, block,
+                                                       true /*cast to original column type*/));
         }
-    });
+        return Status::OK();
+    };
     // read all data, then return
     if (size == 0 || *eof) {
         *is_empty_row = true;
+        RETURN_IF_ERROR(finalize_column());
         return Status::OK();
     }
-    Status st = doris::vectorized::parse_json_to_variant(column_object, StringRef {json_str, size},
-                                                         _json_parser.get());
-    if (st.is<DATA_QUALITY_ERROR>()) {
-        fmt::memory_buffer error_msg;
-        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. error info: {}",
-                       st.to_string());
-        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                [&]() -> std::string { return std::string((char*)json_str, size); },
-                [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
-        _counter->num_rows_filtered++;
-        if (*_scanner_eof) {
-            // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
-            // we meet enough invalid rows and the scanner should be stopped.
-            // So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
-            *eof = true;
-            return Status::OK();
-        }
-        return Status::DataQualityError(fmt::to_string(error_msg));
-    }
 
+    RETURN_IF_CATCH_EXCEPTION(doris::vectorized::parse_json_to_variant(
+            column_object, StringRef {json_str, size}, _json_parser.get()));
+    // TODO correctly handle data quality error
+    RETURN_IF_ERROR(finalize_column());
     return Status::OK();
 }
 
diff --git a/be/src/vec/json/json_parser.cpp b/be/src/vec/json/json_parser.cpp
index 95dd0034c3..a05a1737b9 100644
--- a/be/src/vec/json/json_parser.cpp
+++ b/be/src/vec/json/json_parser.cpp
@@ -161,8 +161,9 @@ void JSONDataParser<ParserImpl>::traverseArrayElement(const Element& element,
                 if (current_nested_sizes.size() == ctx.current_size) {
                     current_nested_sizes.push_back(array_size);
                 } else if (array_size != current_nested_sizes.back()) {
-                    LOG(FATAL) << fmt::format("Array sizes mismatched ({} and {})", array_size,
-                                              current_nested_sizes.back());
+                    throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                           "Array sizes mismatched ({} and {})", array_size,
+                                           current_nested_sizes.back());
                 }
             }
             path_array.push_back(std::move(values[i]));
diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp
index 7b12d3e7cd..1ae5d0d591 100644
--- a/be/src/vec/json/parse2column.cpp
+++ b/be/src/vec/json/parse2column.cpp
@@ -192,8 +192,8 @@ bool try_insert_default_from_nested(const std::shared_ptr<Node>& entry,
 }
 
 template <typename ParserImpl>
-Status parse_json_to_variant(IColumn& column, const char* src, size_t length,
-                             JSONDataParser<ParserImpl>* parser) {
+void parse_json_to_variant(IColumn& column, const char* src, size_t length,
+                           JSONDataParser<ParserImpl>* parser) {
     auto& column_object = assert_cast<ColumnObject&>(column);
     std::optional<ParseResult> result;
     /// Treat empty string as an empty object
@@ -205,8 +205,8 @@ Status parse_json_to_variant(IColumn& column, const char* src, size_t length,
     }
     if (!result) {
         LOG(INFO) << "failed to parse " << std::string_view(src, length) << ", length= " << length;
-        return Status::DataQualityError(
-                fmt::format("Failed to parse object {}", std::string_view(src, length)));
+        throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
+                               std::string_view(src, length));
     }
     auto& [paths, values] = *result;
     assert(paths.size() == values.size());
@@ -214,18 +214,21 @@ Status parse_json_to_variant(IColumn& column, const char* src, size_t length,
     size_t num_rows = column_object.size();
     for (size_t i = 0; i < paths.size(); ++i) {
         FieldInfo field_info;
-        RETURN_IF_ERROR(get_field_info(values[i], &field_info));
+        get_field_info(values[i], &field_info);
         // TODO support multi dimensions array
         if (!config::enable_parse_multi_dimession_array && field_info.num_dimensions >= 2) {
-            return Status::DataQualityError(
+            throw doris::Exception(
+                    ErrorCode::INVALID_ARGUMENT,
                     "Sorry multi dimensions array is not supported now, we are working on it");
         }
         if (is_nothing(field_info.scalar_type)) {
             continue;
         }
         if (!paths_set.insert(paths[i].get_path()).second) {
-            return Status::DataQualityError(
-                    fmt::format("Object has ambiguous path {}, {}", paths[i].get_path()));
+            // return Status::DataQualityError(
+            //         fmt::format("Object has ambiguous path {}, {}", paths[i].get_path()));
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Object has ambiguous path {}",
+                                   paths[i].get_path());
         }
 
         if (!column_object.has_subcolumn(paths[i])) {
@@ -237,16 +240,11 @@ Status parse_json_to_variant(IColumn& column, const char* src, size_t length,
         }
         auto* subcolumn = column_object.get_subcolumn(paths[i]);
         if (!subcolumn) {
-            return Status::DataQualityError(
-                    fmt::format("Failed to find sub column {}", paths[i].get_path()));
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
+                                   paths[i].get_path());
         }
         assert(subcolumn->size() == num_rows);
-        Status st = subcolumn->insert(std::move(values[i]), std::move(field_info));
-        if (st.is_invalid_argument()) {
-            return Status::DataQualityError(
-                    fmt::format("Failed to insert field {}", st.to_string()));
-        }
-        RETURN_IF_ERROR(st);
+        subcolumn->insert(std::move(values[i]), std::move(field_info));
     }
     // /// Insert default values to missed subcolumns.
     const auto& subcolumns = column_object.get_subcolumns();
@@ -259,7 +257,6 @@ Status parse_json_to_variant(IColumn& column, const char* src, size_t length,
         }
     }
     column_object.incr_num_rows();
-    return Status::OK();
 }
 
 bool extract_key(MutableColumns& columns, StringRef json, const std::vector<StringRef>& keys,
@@ -268,17 +265,16 @@ bool extract_key(MutableColumns& columns, StringRef json, const std::vector<Stri
 }
 
 // exposed interfaces
-Status parse_json_to_variant(IColumn& column, const StringRef& json,
-                             JSONDataParser<SimdJSONParser>* parser) {
+void parse_json_to_variant(IColumn& column, const StringRef& json,
+                           JSONDataParser<SimdJSONParser>* parser) {
     return parse_json_to_variant(column, json.data, json.size, parser);
 }
 
-Status parse_json_to_variant(IColumn& column, const std::vector<StringRef>& jsons) {
+void parse_json_to_variant(IColumn& column, const std::vector<StringRef>& jsons) {
     auto parser = parsers_pool.get([] { return new JSONDataParser<SimdJSONParser>(); });
     for (StringRef str : jsons) {
-        RETURN_IF_ERROR(parse_json_to_variant(column, str.data, str.size, parser.get()));
+        parse_json_to_variant(column, str.data, str.size, parser.get());
     }
-    return Status::OK();
 }
 
 bool extract_key(MutableColumns& columns, const std::vector<StringRef>& jsons,
diff --git a/be/src/vec/json/parse2column.h b/be/src/vec/json/parse2column.h
index 0bc13dd591..4e78d98164 100644
--- a/be/src/vec/json/parse2column.h
+++ b/be/src/vec/json/parse2column.h
@@ -35,12 +35,12 @@ class JSONDataParser;
 
 namespace doris::vectorized {
 
-// parse a batch of json strings into column object
-Status parse_json_to_variant(IColumn& column, const std::vector<StringRef>& jsons);
+// parse a batch of json strings into column object, throws doris::Execption when failed
+void parse_json_to_variant(IColumn& column, const std::vector<StringRef>& jsons);
 
-// parse a single json
-Status parse_json_to_variant(IColumn& column, const StringRef& jsons,
-                             JSONDataParser<SimdJSONParser>* parser);
+// parse a single json, throws doris::Execption when failed
+void parse_json_to_variant(IColumn& column, const StringRef& jsons,
+                           JSONDataParser<SimdJSONParser>* parser);
 
 // extract keys columns from json strings into columns
 bool extract_key(MutableColumns& columns, const std::vector<StringRef>& jsons,
diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp
index 3070975a3c..1deff5f6d2 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -816,7 +816,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorStruct::convert_to_olap()
         DataTypePtr sub_type = data_type_struct->get_element(i);
         ColumnWithTypeAndName sub_typed_column = {sub_column, sub_type, ""};
         _sub_convertors[i]->set_source_column(sub_typed_column, _row_pos, _num_rows);
-        _sub_convertors[i]->convert_to_olap();
+        RETURN_IF_ERROR(_sub_convertors[i]->convert_to_olap());
         _results[data_cursor] = _sub_convertors[i]->get_data();
         _results[null_map_cursor] = _sub_convertors[i]->get_nullmap();
         data_cursor++;
@@ -867,7 +867,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
     ColumnWithTypeAndName item_typed_column = {
             item_data, remove_nullable(data_type_array->get_nested_type()), ""};
     _item_convertor->set_source_column(item_typed_column, start, size);
-    _item_convertor->convert_to_olap();
+    RETURN_IF_ERROR(_item_convertor->convert_to_olap());
 
     CollectionValue* collection_value = _values.data();
     for (size_t i = 0; i < _num_rows; ++i, ++collection_value) {
@@ -954,12 +954,12 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
     _base_offset += elem_size;
     ColumnWithTypeAndName key_typed_column = {key_data, data_type_map->get_key_type(), "map.key"};
     _key_convertor->set_source_column(key_typed_column, start_offset, elem_size);
-    _key_convertor->convert_to_olap();
+    RETURN_IF_ERROR(_key_convertor->convert_to_olap());
 
     ColumnWithTypeAndName value_typed_column = {value_data, data_type_map->get_value_type(),
                                                 "map.value"};
     _value_convertor->set_source_column(value_typed_column, start_offset, elem_size);
-    _value_convertor->convert_to_olap();
+    RETURN_IF_ERROR(_value_convertor->convert_to_olap());
 
     // todo (Amory). put this value into MapValue
     _results[0] = (void*)elem_size;
diff --git a/lowercase.json b/lowercase.json
new file mode 100644
index 0000000000..f2728cd0aa
--- /dev/null
+++ b/lowercase.json
@@ -0,0 +1,5 @@
+{"xxxx": 1234}
+{"xxxx": 12345678}
+{"xxxx": "5679"}
+{"XXXX": "5679"}
+{"YYY": "5679"}
diff --git a/regression-test/data/dynamic_table_p0/floating_point.json b/regression-test/data/dynamic_table_p0/floating_point.json
new file mode 100644
index 0000000000..9d4e742a0e
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/floating_point.json
@@ -0,0 +1,4 @@
+{"type":1}
+{"c": 10000000000}
+{"c":1.0}
+{"ca":1}
diff --git a/regression-test/data/dynamic_table_p0/floating_point2.json b/regression-test/data/dynamic_table_p0/floating_point2.json
new file mode 100644
index 0000000000..5ad60cd4fc
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/floating_point2.json
@@ -0,0 +1,4 @@
+{"type":1}
+{"a": 10000000000}
+{"a":1.0}
+{"a":1}
diff --git a/regression-test/data/dynamic_table_p0/floating_point3.json b/regression-test/data/dynamic_table_p0/floating_point3.json
new file mode 100644
index 0000000000..eab9e4c757
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/floating_point3.json
@@ -0,0 +1,4 @@
+{"type":1}
+{"c": "10110101"}
+{"c":1991.0222}
+{"c":100000}
diff --git a/regression-test/data/dynamic_table_p0/invalid_dimension.json b/regression-test/data/dynamic_table_p0/invalid_dimension.json
new file mode 100644
index 0000000000..23311c2692
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/invalid_dimension.json
@@ -0,0 +1,4 @@
+{"type":1}
+{"b":[1, 2]}
+{"b":""}
+{"b":1}
diff --git a/regression-test/data/dynamic_table_p0/invalid_format.json b/regression-test/data/dynamic_table_p0/invalid_format.json
new file mode 100644
index 0000000000..d9eca95a2a
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/invalid_format.json
@@ -0,0 +1,3 @@
+{"type":1}
+{"a:[1, 2]}
+{"a":1}
diff --git a/regression-test/data/dynamic_table_p0/load.out b/regression-test/data/dynamic_table_p0/load.out
new file mode 100644
index 0000000000..59e01e1dd4
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/load.out
@@ -0,0 +1,1939 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+42890
+
+-- !sql --
+11924
+
+-- !sql --
+23413
+
+-- !sql --
+23413
+
+-- !sql --
+23413
+
+-- !sql --
+27232
+
+-- !sql --
+23413
+
+-- !sql --
+23413
+
+-- !sql --
+38488
+
+-- !sql --
+38488
+
+-- !sql --
+38488
+
+-- !sql --
+38488
+
+-- !sql --
+38488
+
+-- !sql --
+38488
+
+-- !sql --
+12051
+
+-- !sql --
+12051
+
+-- !sql --
+12051
+
+-- !sql --
+12051
+
+-- !sql --
+12051
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5369
+
+-- !sql --
+5339
+
+-- !sql --
+5592
+
+-- !sql --
+5592
+
+-- !sql --
+4768
+
+-- !sql --
+4588
+
+-- !sql --
+1035
+
+-- !sql --
+1035
+
+-- !sql --
+1035
+
+-- !sql --
+1035
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+3407
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+4084
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1298
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+789
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+1319
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+175
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+70
+
+-- !sql --
+181
+
+-- !sql --
+181
+
+-- !sql --
+175
+
+-- !sql --
+1719
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2115
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2087
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+1952
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2142
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2094
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+1972
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+2151
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+1719
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+524
+
+-- !sql --
+1721
+
+-- !sql --
+432
+
+-- !sql --
+656
+
+-- !sql --
+520
+
+-- !sql --
+432
+
+-- !sql --
+677
+
+-- !sql --
+432
+
+-- !sql --
+432
+
+-- !sql --
+432
+
+-- !sql --
+432
+
+-- !sql --
+432
+
+-- !sql --
+1305
+
+-- !sql --
+1135
+
+-- !sql --
+1167
+
+-- !sql --
+2189
+
+-- !sql --
+2189
+
+-- !sql --
+2189
+
+-- !sql --
+843
+
+-- !sql --
+680
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+660
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+119
+
+-- !sql --
+223
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+438
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+515
+
+-- !sql --
+195
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+52
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+40
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+58
+
+-- !sql --
+15
+
+-- !sql --
+391
+
+-- !sql --
+391
+
+-- !sql --
+391
+
+-- !sql --
+391
+
+-- !sql --
+391
+
+-- !sql --
+98
+
+-- !sql --
+79
+
+-- !sql --
+2
+
+-- !sql --
+12
+
+-- !sql --
+16
+
+-- !sql --
+220
+
+-- !sql --
+42
+
diff --git a/regression-test/data/dynamic_table_p0/lowercase.json b/regression-test/data/dynamic_table_p0/lowercase.json
new file mode 100644
index 0000000000..b5fed5f0a0
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/lowercase.json
@@ -0,0 +1,4 @@
+{"xxx": 1234}
+{"xxx": "5679"}
+{"XXX": "5679"}
+{"XXX": "5679"}
diff --git a/regression-test/data/dynamic_table_p0/sql/q05.out b/regression-test/data/dynamic_table_p0/sql/q05.out
index e53460d268..583b8101a4 100644
--- a/regression-test/data/dynamic_table_p0/sql/q05.out
+++ b/regression-test/data/dynamic_table_p0/sql/q05.out
@@ -3,27 +3,27 @@
 0
 
 -- !q05_2 --
-30417
+42890
 
 -- !q05_3 --
+oliver006/elasticsearch-gmail	37
+prakhar1989/awesome-courses	28
 getguesstimate/guesstimate-app	26
+cachethq/Cachet	17
 ericelliott/essential-javascript-links	16
-FreeCodeCamp/FreeCodeCamp	14
-tj/frontend-boilerplate	14
-prakhar1989/awesome-courses	12
 
 -- !q05_4 --
 3487211075
 
 -- !q05_5 --
-94350289813772
+125406378528208
 
 -- !q05_6 --
-518892546
+704293861
 
 -- !q05_7 --
 0
 
 -- !q05_8 --
-56947
+85450
 
diff --git a/regression-test/data/dynamic_table_p0/uppercase.json b/regression-test/data/dynamic_table_p0/uppercase.json
new file mode 100644
index 0000000000..a1236a9ec9
--- /dev/null
+++ b/regression-test/data/dynamic_table_p0/uppercase.json
@@ -0,0 +1,4 @@
+{"A":1}
+{"B":1}
+{"C":1}
+{"D":1}
diff --git a/regression-test/suites/dynamic_table_p0/load.groovy b/regression-test/suites/dynamic_table_p0/load.groovy
index 4837ca3cd1..6ac6836194 100644
--- a/regression-test/suites/dynamic_table_p0/load.groovy
+++ b/regression-test/suites/dynamic_table_p0/load.groovy
@@ -126,6 +126,28 @@ suite("regression_test_dynamic_table", "dynamic_table"){
     sql """insert into test_ghdata_json_unique select * from test_ghdata_json"""
     sql """insert into test_btc_json_unique select * from test_btc_json"""
 
+    // abnormal cases
+    table_name = "abnormal_cases" 
+    sql """
+            DROP TABLE IF EXISTS ${table_name};
+    """
+    sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                qid bigint,
+                XXXX bigint,
+		        ...
+            )
+            DUPLICATE KEY(`qid`)
+            DISTRIBUTED BY HASH(`qid`) BUCKETS 5 
+            properties("replication_num" = "1");
+    """
+    load_json_data.call(table_name, 'true', 'json', 'true', "invalid_dimension.json", 'false')
+    load_json_data.call(table_name, 'true', 'json', 'true', "invalid_format.json", 'false')
+    load_json_data.call(table_name, 'true', 'json', 'true', "floating_point.json", 'true')
+    load_json_data.call(table_name, 'true', 'json', 'true', "floating_point2.json", 'true')
+    load_json_data.call(table_name, 'true', 'json', 'true', "floating_point3.json", 'true')
+    load_json_data.call(table_name, 'true', 'json', 'true', "uppercase.json", 'true')
+
     // load more
     table_name = "gharchive";
     sql "DROP TABLE IF EXISTS ${table_name}"
@@ -145,6 +167,7 @@ suite("regression_test_dynamic_table", "dynamic_table"){
         ); 
         """
     def paths = [
+        """${getS3Url() + '/regression/gharchive/2015-01-01-22.json'}""",
         """${getS3Url() + '/regression/gharchive/2015-01-01-16.json'}""",
         """${getS3Url() + '/regression/gharchive/2016-01-01-16.json'}""",
     ]
@@ -176,4 +199,58 @@ suite("regression_test_dynamic_table", "dynamic_table"){
             }
         }
     }
+
+    sql 'sync'
+    meta = sql_meta 'select * from gharchive limit 1'
+    def array_cols = [
+        "payload.commits.url",
+        "payload.commits.sha",
+        "payload.commits.author.email",
+        "payload.commits.distinct",
+        "payload.commits.author.name",
+        "payload.commits.message",
+        "payload.issue.labels.name",
+        "payload.issue.labels.color",
+        "payload.issue.labels.url",
+        "payload.pages.title",
+        "payload.pages.html_url",
+        "payload.pages.sha",
+        "payload.pages.action",
+        "payload.pages.page_name",
+        "payload.release.assets.uploader.repos_url",
+        "payload.release.assets.uploader.id",
+        "payload.release.assets.uploader.organizations_url",
+        "payload.release.assets.uploader.received_events_url",
+        "payload.release.assets.uploader.site_admin",
+        "payload.release.assets.uploader.subscriptions_url",
+        "payload.release.assets.state",
+        "payload.release.assets.size",
+        "payload.release.assets.uploader.following_url",
+        "payload.release.assets.uploader.starred_url",
+        "payload.release.assets.download_count",
+        "payload.release.assets.created_at",
+        "payload.release.assets.updated_at",
+        "payload.release.assets.browser_download_url",
+        "payload.release.assets.url",
+        "payload.release.assets.uploader.gravatar_id",
+        "payload.release.assets.uploader.gists_url",
+        "payload.release.assets.uploader.url",
+        "payload.release.assets.content_type",
+        "payload.release.assets.name",
+        "payload.release.assets.uploader.login",
+        "payload.release.assets.uploader.avatar_url",
+        "payload.release.assets.uploader.html_url",
+        "payload.release.assets.uploader.followers_url",
+        "payload.release.assets.uploader.events_url",
+        "payload.release.assets.uploader.type",
+        "payload.release.assets.id",
+        "payload.release.assets.label"
+    ]
+    for (List<String> col_meta in meta) {
+        if (col_meta[0] in array_cols) {
+            qt_sql "select sum(array_size(`${col_meta[0]}`)) from gharchive"
+        } else {
+            qt_sql "select count(`${col_meta[0]}`) from gharchive"
+        }
+    } 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org