You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/19 04:29:07 UTC

[doris] branch master updated: [improvement](serde) Optimizing the performance of mysql result writter (#20928)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08fff8923f [improvement](serde) Optimizing the performance of mysql result writter (#20928)
08fff8923f is described below

commit 08fff8923f1245352bdc93dd337c31d3d1f525df
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Mon Jun 19 12:29:01 2023 +0800

    [improvement](serde) Optimizing the performance of mysql result writter (#20928)
    
    When converting query results into MySQL format, it involves transforming columnar data storage into row-based storage. This process raises the question of choosing between sequential reading and sequential writing. In reality, sequential writing is the better choice for performance optimization.
    
    Test with 9M rows contains more than 20 columns, this patch can reduce the conversion time from 20s to 11s.
---
 be/src/util/mysql_row_buffer.cpp                   |  19 ----
 be/src/util/mysql_row_buffer.h                     |   2 -
 be/src/vec/common/int_exp.h                        |   6 +-
 be/src/vec/core/types.h                            |  79 ++++++++++++++-
 .../vec/data_types/serde/data_type_array_serde.cpp |  73 ++++++++------
 .../vec/data_types/serde/data_type_array_serde.h   |  24 ++---
 .../data_types/serde/data_type_bitmap_serde.cpp    |  42 ++++----
 .../vec/data_types/serde/data_type_bitmap_serde.h  |  21 ++--
 .../data_types/serde/data_type_date64_serde.cpp    |  37 ++++---
 .../vec/data_types/serde/data_type_date64_serde.h  |  22 ++---
 .../serde/data_type_datetimev2_serde.cpp           |  41 ++++----
 .../data_types/serde/data_type_datetimev2_serde.h  |  21 ++--
 .../data_types/serde/data_type_datev2_serde.cpp    |  37 ++++---
 .../vec/data_types/serde/data_type_datev2_serde.h  |  22 ++---
 .../data_types/serde/data_type_decimal_serde.cpp   |  38 ++++++-
 .../vec/data_types/serde/data_type_decimal_serde.h |  53 +++-------
 .../serde/data_type_fixedlengthobject_serde.h      |  27 ++---
 .../vec/data_types/serde/data_type_hll_serde.cpp   |  42 ++++----
 be/src/vec/data_types/serde/data_type_hll_serde.h  |  21 ++--
 .../vec/data_types/serde/data_type_jsonb_serde.cpp |  37 ++++++-
 .../vec/data_types/serde/data_type_jsonb_serde.h   |  41 ++------
 .../vec/data_types/serde/data_type_map_serde.cpp   | 110 ++++++++++++---------
 be/src/vec/data_types/serde/data_type_map_serde.h  |  25 +++--
 .../data_types/serde/data_type_nullable_serde.cpp  |  37 ++++---
 .../data_types/serde/data_type_nullable_serde.h    |  23 ++---
 .../data_types/serde/data_type_number_serde.cpp    |  44 +++++++++
 .../vec/data_types/serde/data_type_number_serde.h  |  54 ++--------
 .../vec/data_types/serde/data_type_object_serde.h  |  12 +--
 .../serde/data_type_quantilestate_serde.h          |  63 ++++++------
 be/src/vec/data_types/serde/data_type_serde.h      |  15 +--
 .../data_types/serde/data_type_string_serde.cpp    |  52 ++++++----
 .../vec/data_types/serde/data_type_string_serde.h  |  21 ++--
 .../data_types/serde/data_type_struct_serde.cpp    |  82 ++++++++-------
 .../vec/data_types/serde/data_type_struct_serde.h  |  24 ++---
 .../vec/data_types/serde/data_type_time_serde.cpp  |  28 +++++-
 be/src/vec/data_types/serde/data_type_time_serde.h |  34 ++-----
 be/src/vec/sink/vmysql_result_writer.cpp           |  59 +++++------
 be/src/vec/sink/vmysql_result_writer.h             |   1 -
 38 files changed, 741 insertions(+), 648 deletions(-)

diff --git a/be/src/util/mysql_row_buffer.cpp b/be/src/util/mysql_row_buffer.cpp
index f346c2d43c..a8d2674073 100644
--- a/be/src/util/mysql_row_buffer.cpp
+++ b/be/src/util/mysql_row_buffer.cpp
@@ -80,25 +80,6 @@ MysqlRowBuffer<is_binary_format>::MysqlRowBuffer()
           _dynamic_mode(0),
           _len_pos(0) {}
 
-template <bool is_binary_format>
-MysqlRowBuffer<is_binary_format>::MysqlRowBuffer(MysqlRowBuffer<is_binary_format>&& other) {
-    if (other._buf == other._default_buf) {
-        auto other_length = other.length();
-        memcpy(_default_buf, other._buf, other_length);
-        _buf = _default_buf;
-        _pos = _buf + other_length;
-    } else {
-        _buf = other._buf;
-        other._buf = other._default_buf;
-        _pos = other._pos;
-    }
-    _buf_size = other._buf_size;
-    _dynamic_mode = other._dynamic_mode;
-    _field_count = other._field_count;
-    _field_pos = other._field_pos;
-    _len_pos = other._len_pos;
-}
-
 template <bool is_binary_format>
 void MysqlRowBuffer<is_binary_format>::start_binary_row(uint32_t num_cols) {
     assert(is_binary_format);
diff --git a/be/src/util/mysql_row_buffer.h b/be/src/util/mysql_row_buffer.h
index d0e91e766d..2df739450f 100644
--- a/be/src/util/mysql_row_buffer.h
+++ b/be/src/util/mysql_row_buffer.h
@@ -56,8 +56,6 @@ public:
     MysqlRowBuffer();
     ~MysqlRowBuffer();
 
-    MysqlRowBuffer(MysqlRowBuffer&& other);
-
     void reset() { _pos = _buf; }
 
     // Prepare for binary row buffer
diff --git a/be/src/vec/common/int_exp.h b/be/src/vec/common/int_exp.h
index 677ab98be4..cac7f24f04 100644
--- a/be/src/vec/common/int_exp.h
+++ b/be/src/vec/common/int_exp.h
@@ -66,15 +66,15 @@ inline uint64_t int_exp10(int x) {
 
 namespace common {
 
-inline std::int32_t exp10_i32(int x) {
+inline constexpr std::int32_t exp10_i32(int x) {
     return exp_details::get_exp<std::int32_t, 10, 10>(x);
 }
 
-inline std::int64_t exp10_i64(int x) {
+inline constexpr std::int64_t exp10_i64(int x) {
     return exp_details::get_exp<std::int64_t, 10, 19>(x);
 }
 
-inline __int128 exp10_i128(int x) {
+inline constexpr __int128 exp10_i128(int x) {
     return exp_details::get_exp<__int128, 10, 39>(x);
 }
 
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index 8f272fa6e0..eb21a17117 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -289,17 +289,17 @@ using DateTimeV2 = UInt64;
 struct Int128I {};
 
 template <typename T>
-inline T decimal_scale_multiplier(UInt32 scale);
+inline constexpr T decimal_scale_multiplier(UInt32 scale);
 template <>
-inline Int32 decimal_scale_multiplier<Int32>(UInt32 scale) {
+inline constexpr Int32 decimal_scale_multiplier<Int32>(UInt32 scale) {
     return common::exp10_i32(scale);
 }
 template <>
-inline Int64 decimal_scale_multiplier<Int64>(UInt32 scale) {
+inline constexpr Int64 decimal_scale_multiplier<Int64>(UInt32 scale) {
     return common::exp10_i64(scale);
 }
 template <>
-inline Int128 decimal_scale_multiplier<Int128>(UInt32 scale) {
+inline constexpr Int128 decimal_scale_multiplier<Int128>(UInt32 scale) {
     return common::exp10_i128(scale);
 }
 
@@ -371,6 +371,17 @@ struct Decimal {
         return *this;
     }
 
+    static constexpr int max_string_length() {
+        constexpr auto precision =
+                std::is_same_v<T, Int32>
+                        ? BeConsts::MAX_DECIMAL32_PRECISION
+                        : (std::is_same_v<T, Int64> ? BeConsts::MAX_DECIMAL64_PRECISION
+                                                    : BeConsts::MAX_DECIMAL128_PRECISION);
+        return precision + 1 // Add a space for decimal place
+               + 1           // Add a space for leading 0
+               + 1;          // Add a space for negative sign
+    }
+
     std::string to_string(UInt32 scale) const {
         if (value == std::numeric_limits<T>::min()) {
             fmt::memory_buffer buffer;
@@ -420,11 +431,71 @@ struct Decimal {
         return str;
     }
 
+    /**
+     * Got the string representation of a decimal.
+     * @param dst Store the result, should be pre-allocated.
+     * @param scale Decimal's scale.
+     * @param scale_multiplier Decimal's scale multiplier.
+     * @return The length of string.
+     */
+    __attribute__((always_inline)) size_t to_string(char* dst, UInt32 scale,
+                                                    const T& scale_multiplier) const {
+        if (UNLIKELY(value == std::numeric_limits<T>::min())) {
+            auto end = fmt::format_to(dst, "{}", value);
+            return end - dst;
+        }
+
+        bool is_negative = value < 0;
+        T abs_value = value;
+        int pos = 0;
+
+        if (is_negative) {
+            abs_value = -value;
+            dst[pos++] = '-';
+        }
+
+        T whole_part = abs_value;
+        T frac_part;
+        if (LIKELY(scale)) {
+            whole_part = abs_value / scale_multiplier;
+            frac_part = abs_value % scale_multiplier;
+        }
+        auto end = fmt::format_to(dst + pos, "{}", whole_part);
+        pos = end - dst;
+
+        if (LIKELY(scale)) {
+            int low_scale = 0;
+            int high_scale = scale;
+            while (low_scale < high_scale) {
+                int mid_scale = (high_scale + low_scale) >> 1;
+                const auto mid_scale_factor = decimal_scale_multiplier<T>(mid_scale);
+                if (mid_scale_factor <= frac_part) {
+                    low_scale = mid_scale + 1;
+                } else {
+                    high_scale = mid_scale;
+                }
+            }
+            dst[pos++] = '.';
+            if (low_scale < scale) {
+                memset(&dst[pos], '0', scale - low_scale);
+                pos += scale - low_scale;
+            }
+            if (frac_part) {
+                end = fmt::format_to(&dst[pos], "{}", frac_part);
+                pos = end - dst;
+            }
+        }
+
+        return pos;
+    }
+
     T value;
 };
 
 template <>
 struct Decimal<Int128I> : public Decimal<Int128> {
+    using NativeType = Int128;
+
     Decimal() = default;
 
 #define DECLARE_NUMERIC_CTOR(TYPE) \
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index 2d224cfeac..f3d0098238 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -85,47 +85,60 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar
 }
 
 template <bool is_binary_format>
-Status DataTypeArraySerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
+Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<is_binary_format>& result,
+                                                  int row_idx, bool col_const) const {
     auto& column_array = assert_cast<const ColumnArray&>(column);
     auto& offsets = column_array.get_offsets();
     auto& data = column_array.get_data();
     bool is_nested_string = data.is_column_string();
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
+    const auto col_index = index_check_const(row_idx, col_const);
+    result.open_dynamic_mode();
+    if (0 != result.push_string("[", 1)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
+    for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
+        if (j != offsets[col_index - 1]) {
+            if (0 != result.push_string(", ", 2)) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
         }
-        const auto col_index = index_check_const(i, col_const);
-        result[row_idx].open_dynamic_mode();
-        buf_ret = result[row_idx].push_string("[", 1);
-        for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
-            if (j != offsets[col_index - 1]) {
-                buf_ret = result[row_idx].push_string(", ", 2);
+        if (data.is_null_at(j)) {
+            if (0 != result.push_string("NULL", strlen("NULL"))) {
+                return Status::InternalError("pack mysql buffer failed.");
             }
-            if (data.is_null_at(j)) {
-                buf_ret = result[row_idx].push_string("NULL", strlen("NULL"));
-            } else {
-                if (is_nested_string) {
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                    RETURN_IF_ERROR(nested_serde->write_column_to_mysql(
-                            data, return_object_data_as_binary, result, row_idx, j, j + 1,
-                            col_const));
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                } else {
-                    RETURN_IF_ERROR(nested_serde->write_column_to_mysql(
-                            data, return_object_data_as_binary, result, row_idx, j, j + 1,
-                            col_const));
+        } else {
+            if (is_nested_string) {
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
                 }
+                RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, j, col_const));
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
+                }
+            } else {
+                RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, j, col_const));
             }
         }
-        buf_ret = result[row_idx].push_string("]", 1);
-        result[row_idx].close_dynamic_mode();
-        ++row_idx;
     }
+    if (0 != result.push_string("]", 1)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
+    result.close_dynamic_mode();
     return Status::OK();
 }
+
+Status DataTypeArraySerDe::write_column_to_mysql(const IColumn& column,
+                                                 MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                 bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeArraySerDe::write_column_to_mysql(const IColumn& column,
+                                                 MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                 bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h
index 97c4625360..9576812ff1 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -56,25 +56,21 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+
+    void set_return_object_as_string(bool value) override {
+        DataTypeSerDe::set_return_object_as_string(value);
+        nested_serde->set_return_object_as_string(value);
     }
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 
     DataTypeSerDeSPtr nested_serde;
 };
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index 231175ff24..f93663efbd 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -83,30 +83,38 @@ void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV
 }
 
 template <bool is_binary_format>
-Status DataTypeBitMapSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
+Status DataTypeBitMapSerDe::_write_column_to_mysql(const IColumn& column,
+                                                   MysqlRowBuffer<is_binary_format>& result,
+                                                   int row_idx, bool col_const) const {
     auto& data_column = assert_cast<const ColumnBitmap&>(column);
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
+    if (_return_object_as_string) {
+        const auto col_index = index_check_const(row_idx, col_const);
+        BitmapValue bitmapValue = data_column.get_element(col_index);
+        size_t size = bitmapValue.getSizeInBytes();
+        std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
+        bitmapValue.write_to(buf.get());
+        if (0 != result.push_string(buf.get(), size)) {
             return Status::InternalError("pack mysql buffer failed.");
         }
-        if (return_object_data_as_binary) {
-            const auto col_index = index_check_const(i, col_const);
-            BitmapValue bitmapValue = data_column.get_element(col_index);
-            size_t size = bitmapValue.getSizeInBytes();
-            std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
-            bitmapValue.write_to(buf.get());
-            buf_ret = result[row_idx].push_string(buf.get(), size);
-        } else {
-            buf_ret = result[row_idx].push_null();
+    } else {
+        if (0 != result.push_null()) {
+            return Status::InternalError("pack mysql buffer failed.");
         }
-        ++row_idx;
     }
     return Status::OK();
 }
 
+Status DataTypeBitMapSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeBitMapSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index 378a1bb8e9..0825e8f298 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -50,26 +50,17 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "Not support read bitmap column from arrow";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     // Bitmap is binary data which is not shown by mysql.
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index a35c78aa24..787a01f250 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -110,23 +110,30 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::A
 }
 
 template <bool is_binary_format>
-Status DataTypeDate64SerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
+Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
+                                                   MysqlRowBuffer<is_binary_format>& result,
+                                                   int row_idx, bool col_const) const {
     auto& data = assert_cast<const ColumnVector<Int64>&>(column).get_data();
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        auto time_num = data[col_index];
-        VecDateTimeValue time_val = binary_cast<Int64, VecDateTimeValue>(time_num);
-        buf_ret = result[row_idx].push_vec_datetime(time_val);
-        ++row_idx;
+    const auto col_index = index_check_const(row_idx, col_const);
+    auto time_num = data[col_index];
+    VecDateTimeValue time_val = binary_cast<Int64, VecDateTimeValue>(time_num);
+    if (UNLIKELY(0 != result.push_vec_datetime(time_val))) {
+        return Status::InternalError("pack mysql buffer failed.");
     }
     return Status::OK();
 }
+
+Status DataTypeDate64SerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeDate64SerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h b/be/src/vec/data_types/serde/data_type_date64_serde.h
index 205d3f8009..c7f29fc118 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -47,25 +47,15 @@ class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
-
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 6f227969f9..0edfc0ae41 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -46,27 +46,34 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, const
         }
     }
 }
+
 template <bool is_binary_format>
-Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
+Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(const IColumn& column,
+                                                       MysqlRowBuffer<is_binary_format>& result,
+                                                       int row_idx, bool col_const) const {
     auto& data = assert_cast<const ColumnVector<UInt64>&>(column).get_data();
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        auto time_num = data[col_index];
-        char buf[64];
-        DateV2Value<DateTimeV2ValueType> date_val =
-                binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(time_num);
-        char* pos = date_val.to_string(buf, scale);
-        buf_ret = result[row_idx].push_string(buf, pos - buf - 1);
-        ++row_idx;
+    const auto col_index = index_check_const(row_idx, col_const);
+    char buf[64];
+    DateV2Value<DateTimeV2ValueType> date_val =
+            binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(data[col_index]);
+    char* pos = date_val.to_string(buf, scale);
+    if (UNLIKELY(0 != result.push_string(buf, pos - buf - 1))) {
+        return Status::InternalError("pack mysql buffer failed.");
     }
     return Status::OK();
 }
+
+Status DataTypeDateTimeV2SerDe::write_column_to_mysql(const IColumn& column,
+                                                      MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                      bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeDateTimeV2SerDe::write_column_to_mysql(const IColumn& column,
+                                                      MysqlRowBuffer<false>& row_buffer,
+                                                      int row_idx, bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
index c92120480a..a571ac10ab 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
@@ -51,25 +51,16 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "not support read arrow array to uint64 column";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
     int scale;
 };
 } // namespace vectorized
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index ea75fdc641..fe75eb38cd 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -65,25 +65,32 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::A
         col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v));
     }
 }
+
 template <bool is_binary_format>
-Status DataTypeDateV2SerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
+Status DataTypeDateV2SerDe::_write_column_to_mysql(const IColumn& column,
+                                                   MysqlRowBuffer<is_binary_format>& result,
+                                                   int row_idx, bool col_const) const {
     auto& data = assert_cast<const ColumnVector<UInt32>&>(column).get_data();
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        auto time_num = data[col_index];
-        DateV2Value<DateV2ValueType> date_val =
-                binary_cast<UInt32, DateV2Value<DateV2ValueType>>(time_num);
-        buf_ret = result[row_idx].push_vec_datetime(date_val);
-        ++row_idx;
+    auto col_index = index_check_const(row_idx, col_const);
+    DateV2Value<DateV2ValueType> date_val =
+            binary_cast<UInt32, DateV2Value<DateV2ValueType>>(data[col_index]);
+    if (UNLIKELY(0 != result.push_vec_datetime(date_val))) {
+        return Status::InternalError("pack mysql buffer failed.");
     }
     return Status::OK();
 }
+
+Status DataTypeDateV2SerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeDateV2SerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.h b/be/src/vec/data_types/serde/data_type_datev2_serde.h
index 934c00e6ee..be435ff80d 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h
@@ -47,25 +47,15 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> {
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
-
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index fca373a40a..d4b2de6281 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -145,9 +145,45 @@ void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
     }
 }
 
+template <typename T>
+template <bool is_binary_format>
+Status DataTypeDecimalSerDe<T>::_write_column_to_mysql(const IColumn& column,
+                                                       MysqlRowBuffer<is_binary_format>& result,
+                                                       int row_idx, bool col_const) const {
+    auto& data = assert_cast<const ColumnDecimal<T>&>(column).get_data();
+    const auto col_index = index_check_const(row_idx, col_const);
+    if constexpr (IsDecimalV2<T>) {
+        DecimalV2Value decimal_val(data[col_index]);
+        auto decimal_str = decimal_val.to_string(scale);
+        if (UNLIKELY(0 != result.push_string(decimal_str.c_str(), decimal_str.size()))) {
+            return Status::InternalError("pack mysql buffer failed.");
+        }
+    } else {
+        auto length = data[col_index].to_string(buf, scale, scale_multiplier);
+        if (UNLIKELY(0 != result.push_string(buf, length))) {
+            return Status::InternalError("pack mysql buffer failed.");
+        }
+    }
+    return Status::OK();
+}
+
+template <typename T>
+Status DataTypeDecimalSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                      MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                      bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+template <typename T>
+Status DataTypeDecimalSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                      MysqlRowBuffer<false>& row_buffer,
+                                                      int row_idx, bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 template class DataTypeDecimalSerDe<Decimal32>;
 template class DataTypeDecimalSerDe<Decimal64>;
 template class DataTypeDecimalSerDe<Decimal128>;
 template class DataTypeDecimalSerDe<Decimal128I>;
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h b/be/src/vec/data_types/serde/data_type_decimal_serde.h
index a082490e51..60014a99cf 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.h
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h
@@ -47,7 +47,9 @@ class DataTypeDecimalSerDe : public DataTypeSerDe {
     static_assert(IsDecimalNumber<T>);
 
 public:
-    DataTypeDecimalSerDe(int scale) : scale(scale) {};
+    DataTypeDecimalSerDe(int scale_)
+            : scale(scale_),
+              scale_multiplier(decimal_scale_multiplier<typename T::NativeType>(scale)) {}
 
     Status write_column_to_pb(const IColumn& column, PValues& result, int start,
                               int end) const override;
@@ -63,54 +65,21 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
-
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 
     int scale;
+    const T::NativeType scale_multiplier;
+    mutable char buf[T::max_string_length()];
 };
 
-template <typename T>
-template <bool is_binary_format>
-Status DataTypeDecimalSerDe<T>::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
-    auto& data = assert_cast<const ColumnDecimal<T>&>(column).get_data();
-    for (int i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        if constexpr (IsDecimalV2<T>) {
-            DecimalV2Value decimal_val(data[col_index]);
-            auto decimal_str = decimal_val.to_string(scale);
-            buf_ret = result[row_idx].push_string(decimal_str.c_str(), decimal_str.size());
-        } else {
-            std::string decimal_str = data[col_index].to_string(scale);
-            buf_ret = result[row_idx].push_string(decimal_str.c_str(), decimal_str.size());
-        }
-        ++row_idx;
-    }
-    return Status::OK();
-}
 template <typename T>
 Status DataTypeDecimalSerDe<T>::write_column_to_pb(const IColumn& column, PValues& result,
                                                    int start, int end) const {
diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
index 3c714d2cab..092d2202b0 100644
--- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
+++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
@@ -60,26 +60,15 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "Not support read FixedLengthObject column from arrow";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        for (ssize_t i = start; i < end; ++i) {
-            // 0x01 is a magic num, not useful actually, just for present ""
-            char* tmp_val = reinterpret_cast<char*>(0x01);
-            result[row_idx].push_string(tmp_val, 0);
-        }
-        return Status::OK();
+
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override {
+        LOG(FATAL) << "Not support write object column to mysql";
     }
-    // just print empty when type is column fixed length
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        for (ssize_t i = start; i < end; ++i) {
-            // 0x01 is a magic num, not useful actually, just for present ""
-            char* tmp_val = reinterpret_cast<char*>(0x01);
-            result[row_idx].push_string(tmp_val, 0);
-        }
-        return Status::OK();
+
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override {
+        LOG(FATAL) << "Not support write object column to mysql";
     }
 };
 } // namespace vectorized
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index aadb6e2289..2c32ceaccc 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -102,30 +102,38 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const UInt8*
 }
 
 template <bool is_binary_format>
-Status DataTypeHLLSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
+Status DataTypeHLLSerDe::_write_column_to_mysql(const IColumn& column,
+                                                MysqlRowBuffer<is_binary_format>& result,
+                                                int row_idx, bool col_const) const {
     auto& data_column = assert_cast<const ColumnHLL&>(column);
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
+    if (_return_object_as_string) {
+        const auto col_index = index_check_const(row_idx, col_const);
+        HyperLogLog hyperLogLog = data_column.get_element(col_index);
+        size_t size = hyperLogLog.max_serialized_size();
+        std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
+        hyperLogLog.serialize((uint8*)buf.get());
+        if (UNLIKELY(0 != result.push_string(buf.get(), size))) {
             return Status::InternalError("pack mysql buffer failed.");
         }
-        if (return_object_data_as_binary) {
-            const auto col_index = index_check_const(i, col_const);
-            HyperLogLog hyperLogLog = data_column.get_element(col_index);
-            size_t size = hyperLogLog.max_serialized_size();
-            std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
-            hyperLogLog.serialize((uint8*)buf.get());
-            buf_ret = result[row_idx].push_string(buf.get(), size);
-        } else {
-            buf_ret = result[row_idx].push_null();
+    } else {
+        if (UNLIKELY(0 != result.push_null())) {
+            return Status::InternalError("pack mysql buffer failed.");
         }
-        ++row_idx;
     }
     return Status::OK();
 }
 
+Status DataTypeHLLSerDe::write_column_to_mysql(const IColumn& column,
+                                               MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                               bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeHLLSerDe::write_column_to_mysql(const IColumn& column,
+                                               MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                               bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h b/be/src/vec/data_types/serde/data_type_hll_serde.h
index c921e55e5f..7decc40134 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -48,26 +48,17 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "Not support read hll column from arrow";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     // Hll is binary data which is not shown by mysql.
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index bc91d7cce3..03d0038ee0 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -17,5 +17,40 @@
 
 #include "data_type_jsonb_serde.h"
 namespace doris {
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+template <bool is_binary_format>
+Status DataTypeJsonbSerDe::_write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<is_binary_format>& result,
+                                                  int row_idx, bool col_const) const {
+    auto& data = assert_cast<const ColumnString&>(column);
+    const auto col_index = index_check_const(row_idx, col_const);
+    const auto jsonb_val = data.get_data_at(col_index);
+    // jsonb size == 0 is NULL
+    if (jsonb_val.data == nullptr || jsonb_val.size == 0) {
+        if (UNLIKELY(0 != result.push_null())) {
+            return Status::InternalError("pack mysql buffer failed.");
+        }
+    } else {
+        std::string json_str = JsonbToJson::jsonb_to_json_string(jsonb_val.data, jsonb_val.size);
+        if (UNLIKELY(0 != result.push_string(json_str.c_str(), json_str.size()))) {
+            return Status::InternalError("pack mysql buffer failed.");
+        }
+    }
+    return Status::OK();
+}
+
+Status DataTypeJsonbSerDe::write_column_to_mysql(const IColumn& column,
+                                                 MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                 bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeJsonbSerDe::write_column_to_mysql(const IColumn& column,
+                                                 MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                 bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+} // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
index 2eb52db5e1..e2d7b8c7c0 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
@@ -34,44 +34,15 @@ namespace vectorized {
 class Arena;
 
 class DataTypeJsonbSerDe : public DataTypeStringSerDe {
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_jsonb_column_to_mysql(column, return_object_data_as_binary, result, row_idx,
-                                            start, end, col_const);
-    }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_jsonb_column_to_mysql(column, return_object_data_as_binary, result, row_idx,
-                                            start, end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_jsonb_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                        std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                        int row_idx, int start, int end, bool col_const) const {
-        int buf_ret = 0;
-        auto& data = assert_cast<const ColumnString&>(column);
-        for (int i = start; i < end; ++i) {
-            if (0 != buf_ret) {
-                return Status::InternalError("pack mysql buffer failed.");
-            }
-            const auto col_index = index_check_const(i, col_const);
-            const auto jsonb_val = data.get_data_at(col_index);
-            // jsonb size == 0 is NULL
-            if (jsonb_val.data == nullptr || jsonb_val.size == 0) {
-                buf_ret = result[row_idx].push_null();
-            } else {
-                std::string json_str =
-                        JsonbToJson::jsonb_to_json_string(jsonb_val.data, jsonb_val.size);
-                buf_ret = result[row_idx].push_string(json_str.c_str(), json_str.size());
-            }
-            ++row_idx;
-        }
-        return Status::OK();
-    }
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp b/be/src/vec/data_types/serde/data_type_map_serde.cpp
index 5b450081b3..187c3ffc15 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp
@@ -54,67 +54,89 @@ void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const arrow::Arra
                                               const cctz::time_zone& ctz) const {
     LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
 }
+
 template <bool is_binary_format>
-Status DataTypeMapSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
+Status DataTypeMapSerDe::_write_column_to_mysql(const IColumn& column,
+                                                MysqlRowBuffer<is_binary_format>& result,
+                                                int row_idx, bool col_const) const {
     auto& map_column = assert_cast<const ColumnMap&>(column);
     const IColumn& nested_keys_column = map_column.get_keys();
     const IColumn& nested_values_column = map_column.get_values();
     bool is_key_string = remove_nullable(nested_keys_column.get_ptr())->is_column_string();
     bool is_val_string = remove_nullable(nested_values_column.get_ptr())->is_column_string();
 
+    const auto col_index = index_check_const(row_idx, col_const);
+    result.open_dynamic_mode();
+    if (0 != result.push_string("{", 1)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
     auto& offsets = map_column.get_offsets();
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
+    for (auto j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
+        if (j != offsets[col_index - 1]) {
+            if (0 != result.push_string(", ", 2)) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
         }
-        const auto col_index = index_check_const(i, col_const);
-        result[row_idx].open_dynamic_mode();
-        buf_ret = result[row_idx].push_string("{", 1);
-        for (auto j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
-            if (j != offsets[col_index - 1]) {
-                buf_ret = result[row_idx].push_string(", ", 2);
+        if (nested_keys_column.is_null_at(j)) {
+            if (0 != result.push_string("NULL", strlen("NULL"))) {
+                return Status::InternalError("pack mysql buffer failed.");
             }
-            if (nested_keys_column.is_null_at(j)) {
-                buf_ret = result[row_idx].push_string("NULL", strlen("NULL"));
-            } else {
-                if (is_key_string) {
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                    RETURN_IF_ERROR(key_serde->write_column_to_mysql(
-                            nested_keys_column, return_object_data_as_binary, result, row_idx, j,
-                            j + 1, col_const));
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                } else {
-                    RETURN_IF_ERROR(key_serde->write_column_to_mysql(
-                            nested_keys_column, return_object_data_as_binary, result, row_idx, j,
-                            j + 1, col_const));
+        } else {
+            if (is_key_string) {
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
+                }
+                RETURN_IF_ERROR(
+                        key_serde->write_column_to_mysql(nested_keys_column, result, j, col_const));
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
                 }
-            }
-            buf_ret = result[row_idx].push_string(":", 1);
-            if (nested_values_column.is_null_at(j)) {
-                buf_ret = result[row_idx].push_string("NULL", strlen("NULL"));
             } else {
-                if (is_val_string) {
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                    RETURN_IF_ERROR(value_serde->write_column_to_mysql(
-                            nested_values_column, return_object_data_as_binary, result, row_idx, j,
-                            j + 1, col_const));
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                } else {
-                    RETURN_IF_ERROR(value_serde->write_column_to_mysql(
-                            nested_values_column, return_object_data_as_binary, result, row_idx, j,
-                            j + 1, col_const));
+                RETURN_IF_ERROR(
+                        key_serde->write_column_to_mysql(nested_keys_column, result, j, col_const));
+            }
+        }
+        if (0 != result.push_string(":", 1)) {
+            return Status::InternalError("pack mysql buffer failed.");
+        }
+        if (nested_values_column.is_null_at(j)) {
+            if (0 != result.push_string("NULL", strlen("NULL"))) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
+        } else {
+            if (is_val_string) {
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
+                }
+                RETURN_IF_ERROR(value_serde->write_column_to_mysql(nested_values_column, result, j,
+                                                                   col_const));
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
                 }
+            } else {
+                RETURN_IF_ERROR(value_serde->write_column_to_mysql(nested_values_column, result, j,
+                                                                   col_const));
             }
         }
-        buf_ret = result[row_idx].push_string("}", 1);
-        result[row_idx].close_dynamic_mode();
-        ++row_idx;
     }
+    if (0 != result.push_string("}", 1)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
+    result.close_dynamic_mode();
     return Status::OK();
 }
+
+Status DataTypeMapSerDe::write_column_to_mysql(const IColumn& column,
+                                               MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                               bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeMapSerDe::write_column_to_mysql(const IColumn& column,
+                                               MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                               bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h b/be/src/vec/data_types/serde/data_type_map_serde.h
index 21e2c4dacc..f41f75e1e8 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.h
+++ b/be/src/vec/data_types/serde/data_type_map_serde.h
@@ -55,25 +55,22 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+
+    void set_return_object_as_string(bool value) override {
+        DataTypeSerDe::set_return_object_as_string(value);
+        key_serde->set_return_object_as_string(value);
+        value_serde->set_return_object_as_string(value);
     }
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 
     DataTypeSerDeSPtr key_serde;
     DataTypeSerDeSPtr value_serde;
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index 736e1b5f56..f2938450aa 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -133,29 +133,34 @@ void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const arrow:
 }
 
 template <bool is_binary_format>
-Status DataTypeNullableSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
+Status DataTypeNullableSerDe::_write_column_to_mysql(const IColumn& column,
+                                                     MysqlRowBuffer<is_binary_format>& result,
+                                                     int row_idx, bool col_const) const {
     auto& col = static_cast<const ColumnNullable&>(column);
     auto& nested_col = col.get_nested_column();
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
+    const auto col_index = index_check_const(row_idx, col_const);
+    if (col.has_null() && col.is_null_at(col_index)) {
+        if (UNLIKELY(0 != result.push_null())) {
             return Status::InternalError("pack mysql buffer failed.");
         }
-        const auto col_index = index_check_const(i, col_const);
-        if (col.is_null_at(col_index)) {
-            buf_ret = result[row_idx].push_null();
-        } else {
-            RETURN_IF_ERROR(nested_serde->write_column_to_mysql(
-                    nested_col, return_object_data_as_binary, result, row_idx, col_index,
-                    col_index + 1, col_const));
-        }
-        ++row_idx;
+    } else {
+        RETURN_IF_ERROR(
+                nested_serde->write_column_to_mysql(nested_col, result, col_index, col_const));
     }
     return Status::OK();
 }
 
+Status DataTypeNullableSerDe::write_column_to_mysql(const IColumn& column,
+                                                    MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                    bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeNullableSerDe::write_column_to_mysql(const IColumn& column,
+                                                    MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                    bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 6f05d002a2..170dc8e572 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -48,25 +48,20 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
+    void set_return_object_as_string(bool value) override {
+        DataTypeSerDe::set_return_object_as_string(value);
+        nested_serde->set_return_object_as_string(value);
     }
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 
     DataTypeSerDeSPtr nested_serde;
 };
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index d8736d30a8..aac945ebf4 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -113,6 +113,50 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
     col_data.insert(raw_data, raw_data + row_count);
 }
 
+template <typename T>
+template <bool is_binary_format>
+Status DataTypeNumberSerDe<T>::_write_column_to_mysql(const IColumn& column,
+                                                      MysqlRowBuffer<is_binary_format>& result,
+                                                      int row_idx, bool col_const) const {
+    int buf_ret = 0;
+    auto& data = assert_cast<const ColumnType&>(column).get_data();
+    const auto col_index = index_check_const(row_idx, col_const);
+    if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) {
+        buf_ret = result.push_tinyint(data[col_index]);
+    } else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>) {
+        buf_ret = result.push_smallint(data[col_index]);
+    } else if constexpr (std::is_same_v<T, Int32> || std::is_same_v<T, UInt32>) {
+        buf_ret = result.push_int(data[col_index]);
+    } else if constexpr (std::is_same_v<T, Int64> || std::is_same_v<T, UInt64>) {
+        buf_ret = result.push_bigint(data[col_index]);
+    } else if constexpr (std::is_same_v<T, Int128>) {
+        buf_ret = result.push_largeint(data[col_index]);
+    } else if constexpr (std::is_same_v<T, float>) {
+        buf_ret = result.push_float(data[col_index]);
+    } else if constexpr (std::is_same_v<T, double>) {
+        buf_ret = result.push_double(data[col_index]);
+    }
+    if (UNLIKELY(buf_ret != 0)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    } else {
+        return Status::OK();
+    }
+}
+
+template <typename T>
+Status DataTypeNumberSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                     MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                     bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+template <typename T>
+Status DataTypeNumberSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                     MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                     bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 /// Explicit template instantiations - to avoid code bloat in headers.
 template class DataTypeNumberSerDe<UInt8>;
 template class DataTypeNumberSerDe<UInt16>;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h
index e301281a6e..7e1f22874d 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -68,60 +68,18 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 
-template <typename T>
-template <bool is_binary_format>
-Status DataTypeNumberSerDe<T>::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
-    auto& data = assert_cast<const ColumnType&>(column).get_data();
-    for (auto i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) {
-            buf_ret = result[row_idx].push_tinyint(data[col_index]);
-        } else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>) {
-            buf_ret = result[row_idx].push_smallint(data[col_index]);
-        } else if constexpr (std::is_same_v<T, Int32> || std::is_same_v<T, UInt32>) {
-            buf_ret = result[row_idx].push_int(data[col_index]);
-        } else if constexpr (std::is_same_v<T, Int64> || std::is_same_v<T, UInt64>) {
-            buf_ret = result[row_idx].push_bigint(data[col_index]);
-        } else if constexpr (std::is_same_v<T, Int128>) {
-            buf_ret = result[row_idx].push_largeint(data[col_index]);
-        } else if constexpr (std::is_same_v<T, float>) {
-            buf_ret = result[row_idx].push_float(data[col_index]);
-        } else if constexpr (std::is_same_v<T, double>) {
-            buf_ret = result[row_idx].push_double(data[col_index]);
-        }
-        ++row_idx;
-    }
-    return Status::OK();
-}
-
 template <typename T>
 Status DataTypeNumberSerDe<T>::read_column_from_pb(IColumn& column, const PValues& arg) const {
     if constexpr (std::is_same_v<T, UInt8> || std::is_same_v<T, UInt16> ||
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h
index 919764b4c7..6ffbd07eb9 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -61,14 +61,14 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "Not support read object column from arrow";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
+
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override {
         LOG(FATAL) << "Not support write object column to mysql";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
+
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override {
         LOG(FATAL) << "Not support write object column to mysql";
     }
 };
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index ac1a5ea500..c9126ab298 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -57,25 +57,16 @@ public:
                                 int end, const cctz::time_zone& ctz) const override {
         LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
     }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 
 template <typename T>
@@ -127,30 +118,40 @@ void DataTypeQuantileStateSerDe<T>::read_one_cell_from_jsonb(IColumn& column,
 template <typename T>
 template <bool is_binary_format>
 Status DataTypeQuantileStateSerDe<T>::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
+        const IColumn& column, MysqlRowBuffer<is_binary_format>& result, int row_idx,
         bool col_const) const {
     auto& data_column = reinterpret_cast<const ColumnQuantileState<T>&>(column);
-    int buf_ret = 0;
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
+
+    if (_return_object_as_string) {
+        const auto col_index = index_check_const(row_idx, col_const);
+        auto& quantile_value = const_cast<QuantileState<T>&>(data_column.get_element(col_index));
+        size_t size = quantile_value.get_serialized_size();
+        std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
+        quantile_value.serialize((uint8_t*)buf.get());
+        if (0 != result.push_string(buf.get(), size)) {
             return Status::InternalError("pack mysql buffer failed.");
         }
-        if (return_object_data_as_binary) {
-            const auto col_index = index_check_const(i, col_const);
-            auto& quantile_value =
-                    const_cast<QuantileState<T>&>(data_column.get_element(col_index));
-            size_t size = quantile_value.get_serialized_size();
-            std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
-            quantile_value.serialize((uint8_t*)buf.get());
-            buf_ret = result[row_idx].push_string(buf.get(), size);
-        } else {
-            buf_ret = result[row_idx].push_null();
+    } else {
+        if (0 != result.push_null()) {
+            return Status::InternalError("pack mysql buffer failed.");
         }
-        ++row_idx;
     }
     return Status::OK();
 }
 
+template <typename T>
+Status DataTypeQuantileStateSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                            MysqlRowBuffer<true>& row_buffer,
+                                                            int row_idx, bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+template <typename T>
+Status DataTypeQuantileStateSerDe<T>::write_column_to_mysql(const IColumn& column,
+                                                            MysqlRowBuffer<false>& row_buffer,
+                                                            int row_idx, bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h
index 6910ac202e..7064e308c9 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -74,13 +74,11 @@ public:
     virtual void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const = 0;
 
     // MySQL serializer and deserializer
-    virtual Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                         std::vector<MysqlRowBuffer<false>>& result, int row_idx,
-                                         int start, int end, bool col_const) const = 0;
+    virtual Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                         int row_idx, bool col_const) const = 0;
 
-    virtual Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                         std::vector<MysqlRowBuffer<true>>& result, int start,
-                                         int row_idx, int end, bool col_const) const = 0;
+    virtual Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                         int row_idx, bool col_const) const = 0;
     // Thrift serializer and deserializer
 
     // ORC serializer and deserializer
@@ -95,6 +93,11 @@ public:
                                        int end) const = 0;
     virtual void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                         int end, const cctz::time_zone& ctz) const = 0;
+
+    virtual void set_return_object_as_string(bool value) { _return_object_as_string = value; }
+
+protected:
+    bool _return_object_as_string = false;
 };
 
 inline void checkArrowStatus(const arrow::Status& status, const std::string& column,
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp
index c9bd6427a2..02be5f8072 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp
@@ -123,33 +123,45 @@ void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const arrow::A
         }
     }
 }
+
 template <bool is_binary_format>
-Status DataTypeStringSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
+Status DataTypeStringSerDe::_write_column_to_mysql(const IColumn& column,
+                                                   MysqlRowBuffer<is_binary_format>& result,
+                                                   int row_idx, bool col_const) const {
     auto& col = assert_cast<const ColumnString&>(column);
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        const auto string_val = col.get_data_at(col_index);
-        if (string_val.data == nullptr) {
-            if (string_val.size == 0) {
-                // 0x01 is a magic num, not useful actually, just for present ""
-                char* tmp_val = reinterpret_cast<char*>(0x01);
-                buf_ret = result[row_idx].push_string(tmp_val, string_val.size);
-            } else {
-                buf_ret = result[row_idx].push_null();
+    const auto col_index = index_check_const(row_idx, col_const);
+    const auto string_val = col.get_data_at(col_index);
+    if (string_val.data == nullptr) {
+        if (string_val.size == 0) {
+            // 0x01 is a magic num, not useful actually, just for present ""
+            char* tmp_val = reinterpret_cast<char*>(0x01);
+            if (UNLIKELY(0 != result.push_string(tmp_val, string_val.size))) {
+                return Status::InternalError("pack mysql buffer failed.");
             }
         } else {
-            buf_ret = result[row_idx].push_string(string_val.data, string_val.size);
+            if (UNLIKELY(0 != result.push_null())) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
+        }
+    } else {
+        if (UNLIKELY(0 != result.push_string(string_val.data, string_val.size))) {
+            return Status::InternalError("pack mysql buffer failed.");
         }
-        ++row_idx;
     }
     return Status::OK();
 }
+
+Status DataTypeStringSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeStringSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h
index a69c416fa5..1a8189fc8c 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -46,25 +46,16 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                  std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                  int row_idx, int start, int end, bool col_const) const;
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
index 3646b9a7ef..3e384e8140 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
@@ -56,48 +56,64 @@ void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const arrow::A
                                                  const cctz::time_zone& ctz) const {
     LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
 }
+
 template <bool is_binary_format>
-Status DataTypeStructSerDe::_write_column_to_mysql(
-        const IColumn& column, bool return_object_data_as_binary,
-        std::vector<MysqlRowBuffer<is_binary_format>>& result, int row_idx, int start, int end,
-        bool col_const) const {
-    int buf_ret = 0;
+Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column,
+                                                   MysqlRowBuffer<is_binary_format>& result,
+                                                   int row_idx, bool col_const) const {
     auto& col = assert_cast<const ColumnStruct&>(column);
-    for (ssize_t i = start; i < end; ++i) {
-        if (0 != buf_ret) {
-            return Status::InternalError("pack mysql buffer failed.");
-        }
-        const auto col_index = index_check_const(i, col_const);
-        result[row_idx].open_dynamic_mode();
-        buf_ret = result[row_idx].push_string("{", 1);
-        bool begin = true;
-        for (size_t j = 0; j < elemSerDeSPtrs.size(); ++j) {
-            if (!begin) {
-                buf_ret = result[row_idx].push_string(", ", 2);
+    const auto col_index = index_check_const(row_idx, col_const);
+    result.open_dynamic_mode();
+    if (0 != result.push_string("{", 1)) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
+    bool begin = true;
+    for (size_t j = 0; j < elemSerDeSPtrs.size(); ++j) {
+        if (!begin) {
+            if (0 != result.push_string(", ", 2)) {
+                return Status::InternalError("pack mysql buffer failed.");
             }
+        }
 
-            if (col.get_column_ptr(j)->is_null_at(col_index)) {
-                buf_ret = result[row_idx].push_string("NULL", strlen("NULL"));
-            } else {
-                if (remove_nullable(col.get_column_ptr(j))->is_column_string()) {
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                    RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(
-                            col.get_column(j), return_object_data_as_binary, result, row_idx,
-                            col_index, col_index + 1, col_const));
-                    buf_ret = result[row_idx].push_string("\"", 1);
-                } else {
-                    RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(
-                            col.get_column(j), return_object_data_as_binary, result, row_idx,
-                            col_index, col_index + 1, col_const));
+        if (col.get_column_ptr(j)->is_null_at(col_index)) {
+            if (0 != result.push_string("NULL", strlen("NULL"))) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
+        } else {
+            if (remove_nullable(col.get_column_ptr(j))->is_column_string()) {
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
+                }
+                RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(col.get_column(j), result,
+                                                                         col_index, col_const));
+                if (0 != result.push_string("\"", 1)) {
+                    return Status::InternalError("pack mysql buffer failed.");
                 }
+            } else {
+                RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(col.get_column(j), result,
+                                                                         col_index, col_const));
             }
-            begin = false;
         }
-        buf_ret = result[row_idx].push_string("}", 1);
-        result[row_idx].close_dynamic_mode();
-        ++row_idx;
+        begin = false;
+    }
+    if (UNLIKELY(0 != result.push_string("}", 1))) {
+        return Status::InternalError("pack mysql buffer failed.");
     }
+    result.close_dynamic_mode();
     return Status::OK();
 }
+
+Status DataTypeStructSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeStructSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                  bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h b/be/src/vec/data_types/serde/data_type_struct_serde.h
index b89f3eedb1..3b5c183246 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.h
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.h
@@ -56,18 +56,17 @@ public:
                                int end) const override;
     void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
                                 int end, const cctz::time_zone& ctz) const override;
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
-    }
 
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start,
-                                      end, col_const);
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+
+    void set_return_object_as_string(bool value) override {
+        DataTypeSerDe::set_return_object_as_string(value);
+        for (auto& serde : elemSerDeSPtrs) {
+            serde->set_return_object_as_string(value);
+        }
     }
 
 private:
@@ -75,6 +74,9 @@ private:
     Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
                                   std::vector<MysqlRowBuffer<is_binary_format>>& result,
                                   int row_idx, int start, int end, bool col_const) const;
+    template <bool is_binary_format>
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 
     DataTypeSerDeSPtrs elemSerDeSPtrs;
 };
diff --git a/be/src/vec/data_types/serde/data_type_time_serde.cpp b/be/src/vec/data_types/serde/data_type_time_serde.cpp
index 7e21f1f2c4..46d5989a0b 100644
--- a/be/src/vec/data_types/serde/data_type_time_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_time_serde.cpp
@@ -17,5 +17,31 @@
 
 #include "data_type_time_serde.h"
 namespace doris {
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+template <bool is_binary_format>
+Status DataTypeTimeSerDe::_write_column_to_mysql(const IColumn& column,
+                                                 MysqlRowBuffer<is_binary_format>& result,
+                                                 int row_idx, bool col_const) const {
+    auto& data = assert_cast<const ColumnVector<Float64>&>(column).get_data();
+    const auto col_index = index_check_const(row_idx, col_const);
+    if (UNLIKELY(0 != result.push_time(data[col_index]))) {
+        return Status::InternalError("pack mysql buffer failed.");
+    }
+    return Status::OK();
+}
+
+Status DataTypeTimeSerDe::write_column_to_mysql(const IColumn& column,
+                                                MysqlRowBuffer<true>& row_buffer, int row_idx,
+                                                bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+Status DataTypeTimeSerDe::write_column_to_mysql(const IColumn& column,
+                                                MysqlRowBuffer<false>& row_buffer, int row_idx,
+                                                bool col_const) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
+}
+
+} // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_time_serde.h b/be/src/vec/data_types/serde/data_type_time_serde.h
index 78fc962466..3c1c14f977 100644
--- a/be/src/vec/data_types/serde/data_type_time_serde.h
+++ b/be/src/vec/data_types/serde/data_type_time_serde.h
@@ -31,37 +31,15 @@ namespace vectorized {
 class Arena;
 
 class DataTypeTimeSerDe : public DataTypeNumberSerDe<Float64> {
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<false>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_date_time_column_to_mysql(column, return_object_data_as_binary, result,
-                                                row_idx, start, end, col_const);
-    }
-    Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary,
-                                 std::vector<MysqlRowBuffer<true>>& result, int row_idx, int start,
-                                 int end, bool col_const) const override {
-        return _write_date_time_column_to_mysql(column, return_object_data_as_binary, result,
-                                                row_idx, start, end, col_const);
-    }
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
+                                 int row_idx, bool col_const) const override;
+    Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
+                                 int row_idx, bool col_const) const override;
 
 private:
     template <bool is_binary_format>
-    Status _write_date_time_column_to_mysql(const IColumn& column,
-                                            bool return_object_data_as_binary,
-                                            std::vector<MysqlRowBuffer<is_binary_format>>& result,
-                                            int row_idx, int start, int end, bool col_const) const {
-        int buf_ret = 0;
-        auto& data = assert_cast<const ColumnVector<Float64>&>(column).get_data();
-        for (int i = start; i < end; ++i) {
-            if (0 != buf_ret) {
-                return Status::InternalError("pack mysql buffer failed.");
-            }
-            const auto col_index = index_check_const(i, col_const);
-            buf_ret = result[row_idx].push_time(data[col_index]);
-            ++row_idx;
-        }
-        return Status::OK();
-    }
+    Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const) const;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp
index 019c0556c4..fd600e0bee 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -610,50 +610,46 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
     // convert one batch
     auto result = std::make_unique<TFetchDataResult>();
     auto num_rows = block.rows();
+    result->result_batch.rows.resize(num_rows);
 
+    uint64_t bytes_sent = 0;
     {
         SCOPED_TIMER(_convert_tuple_timer);
-        if (_rows_buffer.size() < num_rows) {
-            _rows_buffer.resize(num_rows);
+        MysqlRowBuffer<is_binary_format> row_buffer;
+        if constexpr (is_binary_format) {
+            row_buffer.start_binary_row(_output_vexpr_ctxs.size());
         }
 
-        for (size_t i = 0; i != num_rows; ++i) {
-            _rows_buffer[i].reset();
-            if constexpr (is_binary_format) {
-                _rows_buffer[i].start_binary_row(_output_vexpr_ctxs.size());
-            }
-        }
+        struct Arguments {
+            const IColumn* column;
+            bool is_const;
+            DataTypeSerDeSPtr serde;
+        };
 
-        for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
+        std::vector<Arguments> arguments;
+        for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
             const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
-            auto type_ptr = block.get_by_position(i).type;
-
-            DCHECK(num_rows == block.get_by_position(i).column->size())
-                    << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
-                                   block.get_by_position(i).column->size());
+            auto serde = block.get_by_position(i).type->get_serde();
+            serde->set_return_object_as_string(output_object_data());
+            arguments.emplace_back(column_ptr.get(), col_const, serde);
+        }
 
-            RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
-                    *column_ptr, output_object_data(), _rows_buffer, 0, 0, num_rows, col_const));
+        for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) {
+            for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
+                RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql(
+                        *(arguments[i].column), row_buffer, row_idx, arguments[i].is_const));
+            }
 
-            if (!status) {
-                LOG(WARNING) << "convert row to mysql result failed. block_struct="
-                             << block.dump_structure();
-                break;
+            // copy MysqlRowBuffer to Thrift
+            result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length());
+            bytes_sent += row_buffer.length();
+            row_buffer.reset();
+            if constexpr (is_binary_format) {
+                row_buffer.start_binary_row(_output_vexpr_ctxs.size());
             }
         }
     }
-
-    uint64_t bytes_sent = 0;
-    // copy MysqlRowBuffer to Thrift
     {
-        SCOPED_TIMER(_copy_buffer_timer);
-        result->result_batch.rows.resize(num_rows);
-        for (int i = 0; i < num_rows; ++i) {
-            result->result_batch.rows[i].append(_rows_buffer[i].buf(), _rows_buffer[i].length());
-            bytes_sent += _rows_buffer[i].length();
-        }
-    }
-    if (status) {
         SCOPED_TIMER(_result_send_timer);
         // If this is a dry run task, no need to send data block
         if (!_is_dry_run) {
@@ -672,7 +668,6 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
             LOG(WARNING) << "append result batch to sink failed.";
         }
     }
-
     return status;
 }
 
diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h
index 3b6b8579d7..dcd3f4dd54 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -69,7 +69,6 @@ private:
     BufferControlBlock* _sinker;
 
     const VExprContextSPtrs& _output_vexpr_ctxs;
-    std::vector<MysqlRowBuffer<is_binary_format>> _rows_buffer;
 
     RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
     // total time cost on append batch operation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org