You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/24 14:29:39 UTC

[doris] branch master updated: [refactor](remove unused code) remove storage buffer and orc reader (#16137)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e8eedc521 [refactor](remove unused code) remove storage buffer and orc reader (#16137)
6e8eedc521 is described below

commit 6e8eedc52181be95f466d15ff73defe7f29a4a80
Author: yiguolei <67...@qq.com>
AuthorDate: Tue Jan 24 22:29:32 2023 +0800

    [refactor](remove unused code) remove storage buffer and orc reader (#16137)
    
    remove olap storage byte buffer
    remove orc reader
    remove time operator
    remove read_write_util
    remove aggregate funcs
    remove compress.h and cpp
    remove bhp_lib
    
    Co-authored-by: yiguolei <yi...@gmail.com>
---
 be/src/common/daemon.cpp                 |    2 -
 be/src/exec/CMakeLists.txt               |    1 -
 be/src/exec/arrow/orc_reader.cpp         |  182 --
 be/src/exec/arrow/orc_reader.h           |   61 -
 be/src/exec/read_write_util.cpp          |   72 -
 be/src/exec/read_write_util.h            |  229 ---
 be/src/exprs/CMakeLists.txt              |    2 -
 be/src/exprs/aggregate_functions.cpp     | 3089 ------------------------------
 be/src/exprs/aggregate_functions.h       |  422 ----
 be/src/exprs/time_operators.cpp          |   65 -
 be/src/exprs/time_operators.h            |   43 -
 be/src/olap/CMakeLists.txt               |    4 +-
 be/src/olap/bhp_lib.h                    |  753 --------
 be/src/olap/byte_buffer.cpp              |  204 --
 be/src/olap/byte_buffer.h                |  210 --
 be/src/olap/compress.cpp                 |   91 -
 be/src/olap/compress.h                   |   55 -
 be/src/olap/utils.cpp                    |   12 -
 be/src/olap/utils.h                      |   20 -
 be/src/vec/exec/scan/vfile_scanner.cpp   |    1 -
 be/src/vec/exec/varrow_scanner.h         |    1 -
 be/test/CMakeLists.txt                   |    4 -
 be/test/exprs/bitmap_function_test.cpp   |    1 -
 be/test/exprs/percentile_approx_test.cpp |  142 --
 be/test/exprs/percentile_test.cpp        |  114 --
 be/test/exprs/window_funnel_test.cpp     |  425 ----
 be/test/olap/byte_buffer_test.cpp        |  190 --
 be/test/util/decompress_test.cpp         |    1 -
 28 files changed, 1 insertion(+), 6395 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index a425501749..066b06a4c6 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -33,7 +33,6 @@
 #include "exprs/math_functions.h"
 #include "exprs/quantile_function.h"
 #include "exprs/string_functions.h"
-#include "exprs/time_operators.h"
 #include "exprs/timestamp_functions.h"
 #include "exprs/topn_function.h"
 #include "exprs/utility_functions.h"
@@ -370,7 +369,6 @@ void Daemon::init(int argc, char** argv, const std::vector<StorePath>& paths) {
     MathFunctions::init();
     EncryptionFunctions::init();
     TimestampFunctions::init();
-    TimeOperators::init();
     UtilityFunctions::init();
     JsonFunctions::init();
     GeoFunctions::init();
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 848fe5a8f2..78b520a78b 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -23,7 +23,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")
 
 set(EXEC_FILES
     arrow/arrow_reader.cpp
-    arrow/orc_reader.cpp
     arrow/parquet_reader.cpp
     base_scanner.cpp
     data_sink.cpp
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
deleted file mode 100644
index ae5625f2cd..0000000000
--- a/be/src/exec/arrow/orc_reader.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "exec/arrow/orc_reader.h"
-
-#include <arrow/array.h>
-#include <arrow/status.h>
-#include <time.h>
-
-#include "common/logging.h"
-#include "io/file_reader.h"
-#include "runtime/mem_pool.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
-#include "util/string_util.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
-
-namespace doris {
-
-ORCReaderWrap::ORCReaderWrap(RuntimeState* state,
-                             const std::vector<SlotDescriptor*>& file_slot_descs,
-                             FileReader* file_reader, int32_t num_of_columns_from_file,
-                             int64_t range_start_offset, int64_t range_size, bool case_sensitive)
-        : ArrowReaderWrap(state, file_slot_descs, file_reader, num_of_columns_from_file,
-                          case_sensitive),
-          _range_start_offset(range_start_offset),
-          _range_size(range_size) {
-    _reader = nullptr;
-    _cur_file_eof = false;
-}
-
-Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, const std::string& timezone) {
-    // Open ORC file reader
-    auto maybe_reader =
-            arrow::adapters::orc::ORCFileReader::Open(_arrow_file, arrow::default_memory_pool());
-    if (!maybe_reader.ok()) {
-        // Handle error instantiating file reader...
-        LOG(WARNING) << "failed to create orc file reader, errmsg=" << maybe_reader.status();
-        return Status::InternalError("Failed to create orc file reader");
-    }
-    _reader = std::move(maybe_reader.ValueOrDie());
-    _total_groups = _reader->NumberOfStripes();
-    if (_total_groups == 0) {
-        return Status::EndOfFile("Empty Orc File");
-    }
-    // seek file position after _reader created.
-    RETURN_IF_ERROR(_seek_start_stripe());
-
-    // map
-    arrow::Result<std::shared_ptr<arrow::Schema>> maybe_schema = _reader->ReadSchema();
-    if (!maybe_schema.ok()) {
-        // Handle error instantiating file reader...
-        LOG(WARNING) << "failed to read schema, errmsg=" << maybe_schema.status();
-        return Status::InternalError("Failed to create orc file reader");
-    }
-    _schema = maybe_schema.ValueOrDie();
-    for (size_t i = 0; i < _schema->num_fields(); ++i) {
-        std::string schemaName =
-                _case_sensitive ? _schema->field(i)->name() : to_lower(_schema->field(i)->name());
-        // orc index started from 1.
-        _map_column.emplace(schemaName, i + 1);
-    }
-    RETURN_IF_ERROR(column_indices());
-
-    _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
-
-    return Status::OK();
-}
-
-Status ORCReaderWrap::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
-                                  std::unordered_set<std::string>* missing_cols) {
-    for (size_t i = 0; i < _schema->num_fields(); ++i) {
-        std::string schema_name =
-                _case_sensitive ? _schema->field(i)->name() : to_lower(_schema->field(i)->name());
-        TypeDescriptor type;
-        RETURN_IF_ERROR(
-                vectorized::arrow_type_to_doris_type(_schema->field(i)->type()->id(), &type));
-        name_to_type->emplace(schema_name, type);
-    }
-
-    for (auto& col : _missing_cols) {
-        missing_cols->insert(col);
-    }
-    return Status::OK();
-}
-
-Status ORCReaderWrap::_seek_start_stripe() {
-    // If file was from Hms table, _range_start_offset is started from 3(magic word).
-    // And if file was from load, _range_start_offset is always set to zero.
-    // So now we only support file split for hms table.
-    // TODO: support file split for loading.
-    if (_range_size <= 0 || _range_start_offset == 0) {
-        return Status::OK();
-    }
-    int64_t row_number = 0;
-    int start_group = _current_group;
-    int end_group = _total_groups;
-    for (int i = 0; i < _total_groups; i++) {
-        int64_t _offset = _reader->GetRawORCReader()->getStripe(i)->getOffset();
-        int64_t row = _reader->GetRawORCReader()->getStripe(i)->getNumberOfRows();
-        if (_offset < _range_start_offset) {
-            row_number += row;
-        } else if (_offset == _range_start_offset) {
-            // If using the external file scan, _range_start_offset is always in the offset lists.
-            // If using broker load, _range_start_offset is always set to be 0.
-            start_group = i;
-        }
-        if (_range_start_offset + _range_size <= _offset) {
-            end_group = i;
-            break;
-        }
-    }
-
-    LOG(INFO) << "This reader read orc file from offset: " << _range_start_offset
-              << " with size: " << _range_size << ". Also mean that read from strip id from "
-              << start_group << " to " << end_group;
-
-    if (!_reader->Seek(row_number).ok()) {
-        LOG(WARNING) << "Failed to seek to the line number: " << row_number;
-        return Status::InternalError("Failed to seek to the line number");
-    }
-
-    _current_group = start_group;
-    _total_groups = end_group;
-
-    return Status::OK();
-}
-
-Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
-    if (_current_group >= _total_groups) {
-        *eof = true;
-        return Status::OK();
-    }
-    // Get a stripe level record batch iterator.
-    // record batch will have up to batch_size rows.
-    // NextStripeReader serves as a fine grained alternative to ReadStripe
-    // which may cause OOM issues by loading the whole stripe into memory.
-    // Note this will only read rows for the current stripe, not the entire file.
-    arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> maybe_rb_reader =
-            _reader->NextStripeReader(_state->batch_size(), _include_cols);
-    if (!maybe_rb_reader.ok()) {
-        LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status();
-        return Status::InternalError(maybe_rb_reader.status().ToString());
-    }
-    _rb_reader = maybe_rb_reader.ValueOrDie();
-    _current_group++;
-    return Status::OK();
-}
-
-void ORCReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) {
-    bool eof = false;
-    Status status = _next_stripe_reader(&eof);
-    if (!status.ok()) {
-        _closed = true;
-        return;
-    }
-    if (eof) {
-        _closed = true;
-        return;
-    }
-
-    _status = _rb_reader->ReadAll(&batches);
-}
-
-bool ORCReaderWrap::filter_row_group(int current_group) {
-    return false;
-}
-
-} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
deleted file mode 100644
index f210f66909..0000000000
--- a/be/src/exec/arrow/orc_reader.h
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <arrow/adapters/orc/adapter.h>
-#include <arrow/api.h>
-#include <arrow/buffer.h>
-#include <stdint.h>
-
-#include <map>
-#include <string>
-
-#include "common/status.h"
-#include "exec/arrow/arrow_reader.h"
-
-namespace doris {
-
-// Reader of ORC file
-class ORCReaderWrap final : public ArrowReaderWrap {
-public:
-    ORCReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& file_slot_descs,
-                  FileReader* file_reader, int32_t num_of_columns_from_file,
-                  int64_t range_start_offset, int64_t range_size, bool case_sensitive = true);
-    ~ORCReaderWrap() override = default;
-
-    Status init_reader(const TupleDescriptor* tuple_desc, const std::string& timezone) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
-private:
-    Status _next_stripe_reader(bool* eof);
-    Status _seek_start_stripe();
-    void read_batches(arrow::RecordBatchVector& batches, int current_group) override;
-    bool filter_row_group(int current_group) override;
-
-private:
-    // orc file reader object
-    std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader;
-    std::shared_ptr<arrow::Schema> _schema;
-    bool _cur_file_eof; // is read over?
-    int64_t _range_start_offset;
-    int64_t _range_size;
-};
-
-} // namespace doris
diff --git a/be/src/exec/read_write_util.cpp b/be/src/exec/read_write_util.cpp
deleted file mode 100644
index 0c8fa063c6..0000000000
--- a/be/src/exec/read_write_util.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/read_write_util.h"
-
-namespace doris {
-
-int ReadWriteUtil::put_zint(int32_t integer, uint8_t* buf) {
-    // Move the sign bit to the first bit.
-    uint32_t uinteger = (integer << 1) ^ (integer >> 31);
-    const int mask = 0x7f;
-    const int cont = 0x80;
-    buf[0] = uinteger & mask;
-    int len = 1;
-
-    while ((uinteger >>= 7) != 0) {
-        // Set the continuation bit.
-        buf[len - 1] |= cont;
-        buf[len] = uinteger & mask;
-        ++len;
-    }
-
-    return len;
-}
-
-int ReadWriteUtil::put_zlong(int64_t longint, uint8_t* buf) {
-    // Move the sign bit to the first bit.
-    uint64_t ulongint = (longint << 1) ^ (longint >> 63);
-    const int mask = 0x7f;
-    const int cont = 0x80;
-    buf[0] = ulongint & mask;
-    int len = 1;
-
-    while ((ulongint >>= 7) != 0) {
-        // Set the continuation bit.
-        buf[len - 1] |= cont;
-        buf[len] = ulongint & mask;
-        ++len;
-    }
-
-    return len;
-}
-
-std::string ReadWriteUtil::hex_dump(const uint8_t* buf, int64_t length) {
-    std::stringstream ss;
-    ss << std::hex;
-
-    for (int i = 0; i < length; ++i) {
-        ss << static_cast<int>(buf[i]) << " ";
-    }
-
-    return ss.str();
-}
-
-std::string ReadWriteUtil::hex_dump(const char* buf, int64_t length) {
-    return hex_dump(reinterpret_cast<const uint8_t*>(buf), length);
-}
-} // namespace doris
diff --git a/be/src/exec/read_write_util.h b/be/src/exec/read_write_util.h
deleted file mode 100644
index 0b387e0fbb..0000000000
--- a/be/src/exec/read_write_util.h
+++ /dev/null
@@ -1,229 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <sstream>
-
-#include "common/status.h"
-
-namespace doris {
-
-#define RETURN_IF_FALSE(x) \
-    if (UNLIKELY(!(x))) return false
-
-// Class for reading and writing various data types.
-class ReadWriteUtil {
-public:
-    // Maximum length for Writeable VInt
-    static const int MAX_VINT_LEN = 9;
-
-    // Maximum lengths for Zigzag encodings.
-    const static int MAX_ZINT_LEN = 5;
-    const static int MAX_ZLONG_LEN = 10;
-
-    // Put a zigzag encoded integer into a buffer and return its length.
-    static int put_zint(int32_t integer, uint8_t* buf);
-
-    // Put a zigzag encoded long integer into a buffer and return its length.
-    static int put_zlong(int64_t longint, uint8_t* buf);
-
-    // Get a big endian integer from a buffer.  The buffer does not have to be word aligned.
-    static int32_t get_int(const uint8_t* buffer);
-    static int16_t get_small_int(const uint8_t* buffer);
-    static int64_t get_long_int(const uint8_t* buffer);
-
-    // Get a variable-length Long or int value from a byte buffer.
-    // Returns the length of the long/int
-    // If the size byte is corrupted then return -1;
-    static int get_vlong(uint8_t* buf, int64_t* vlong);
-    static int get_vint(uint8_t* buf, int32_t* vint);
-
-    // Read a variable-length Long value from a byte buffer starting at the specified
-    // byte offset.
-    static int get_vlong(uint8_t* buf, int64_t offset, int64_t* vlong);
-
-    // Put an Integer into a buffer in big endian order .  The buffer must be at least
-    // 4 bytes long.
-    static void put_int(uint8_t* buf, int32_t integer);
-
-    // Dump the first length bytes of buf to a Hex string.
-    static std::string hex_dump(const uint8_t* buf, int64_t length);
-    static std::string hex_dump(const char* buf, int64_t length);
-
-    // Determines the sign of a VInt/VLong from the first byte.
-    static bool is_negative_vint(int8_t byte);
-
-    // Determines the total length in bytes of a Writable VInt/VLong from the first byte.
-    static int decode_vint_size(int8_t byte);
-
-    // The following methods read data from a buffer without assuming the buffer is long
-    // enough. If the buffer isn't long enough or another error occurs, they return false
-    // and update the status with the error. Otherwise they return true. buffer is advanced
-    // past the data read and buf_len is decremented appropriately.
-
-    // Read a zig-zag encoded long. This is the integer encoding defined by google.com
-    // protocol-buffers: https://developers.google.com/protocol-buffers/docs/encoding
-    static bool read_zlong(uint8_t** buf, int* buf_len, int64_t* val, Status* status);
-
-    // Read a zig-zag encoded int.
-    static bool read_zint(uint8_t** buf, int* buf_len, int32_t* val, Status* status);
-
-    // Read a native type T (e.g. bool, float) directly into output (i.e. input is cast
-    // directly to T and incremented by sizeof(T)).
-    template <class T>
-    static bool read(uint8_t** buf, int* buf_len, T* val, Status* status);
-
-    // Skip the next num_bytes bytes.
-    static bool skip_bytes(uint8_t** buf, int* buf_len, int num_bytes, Status* status);
-};
-
-inline int16_t ReadWriteUtil::get_small_int(const uint8_t* buf) {
-    return (buf[0] << 8) | buf[1];
-}
-
-inline int32_t ReadWriteUtil::get_int(const uint8_t* buf) {
-    return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
-}
-
-inline int64_t ReadWriteUtil::get_long_int(const uint8_t* buf) {
-    return (static_cast<int64_t>(buf[0]) << 56) | (static_cast<int64_t>(buf[1]) << 48) |
-           (static_cast<int64_t>(buf[2]) << 40) | (static_cast<int64_t>(buf[3]) << 32) |
-           (buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | buf[7];
-}
-
-inline void ReadWriteUtil::put_int(uint8_t* buf, int32_t integer) {
-    buf[0] = integer >> 24;
-    buf[1] = integer >> 16;
-    buf[2] = integer >> 8;
-    buf[3] = integer;
-}
-
-inline int ReadWriteUtil::get_vint(uint8_t* buf, int32_t* vint) {
-    int64_t vlong = 0;
-    int len = get_vlong(buf, &vlong);
-    *vint = static_cast<int32_t>(vlong);
-    return len;
-}
-
-inline int ReadWriteUtil::get_vlong(uint8_t* buf, int64_t* vlong) {
-    return get_vlong(buf, 0, vlong);
-}
-
-inline int ReadWriteUtil::get_vlong(uint8_t* buf, int64_t offset, int64_t* vlong) {
-    int8_t firstbyte = (int8_t)buf[0 + offset];
-
-    int len = decode_vint_size(firstbyte);
-
-    if (len > MAX_VINT_LEN) {
-        return -1;
-    }
-
-    if (len == 1) {
-        *vlong = static_cast<int64_t>(firstbyte);
-        return len;
-    }
-
-    *vlong &= ~*vlong;
-
-    for (int i = 1; i < len; i++) {
-        *vlong = (*vlong << 8) | buf[i + offset];
-    }
-
-    if (is_negative_vint(firstbyte)) {
-        *vlong = *vlong ^ ((int64_t)-1);
-    }
-
-    return len;
-}
-
-inline bool ReadWriteUtil::read_zint(uint8_t** buf, int* buf_len, int32_t* val, Status* status) {
-    int64_t zlong;
-    RETURN_IF_FALSE(read_zlong(buf, buf_len, &zlong, status));
-    *val = static_cast<int32_t>(zlong);
-    return true;
-}
-
-inline bool ReadWriteUtil::read_zlong(uint8_t** buf, int* buf_len, int64_t* val, Status* status) {
-    uint64_t zlong = 0;
-    int shift = 0;
-    bool more;
-
-    do {
-        DCHECK_LE(shift, 64);
-
-        if (UNLIKELY(*buf_len < 1)) {
-            *status = Status::InternalError("Insufficient buffer length");
-            return false;
-        }
-
-        zlong |= static_cast<uint64_t>(**buf & 0x7f) << shift;
-        shift += 7;
-        more = (**buf & 0x80) != 0;
-        ++(*buf);
-        --(*buf_len);
-    } while (more);
-
-    *val = (zlong >> 1) ^ -(zlong & 1);
-    return true;
-}
-
-template <class T>
-inline bool ReadWriteUtil::read(uint8_t** buf, int* buf_len, T* val, Status* status) {
-    int val_len = sizeof(T);
-
-    if (UNLIKELY(val_len > *buf_len)) {
-        *status = Status::InternalError("Cannot read {} bytes, buffer length is {}", val_len,
-                                        *buf_len);
-        return false;
-    }
-
-    *val = *reinterpret_cast<T*>(*buf);
-    *buf += val_len;
-    *buf_len -= val_len;
-    return true;
-}
-
-inline bool ReadWriteUtil::skip_bytes(uint8_t** buf, int* buf_len, int num_bytes, Status* status) {
-    DCHECK_GE(*buf_len, 0);
-
-    if (UNLIKELY(num_bytes > *buf_len)) {
-        *status = Status::InternalError("Cannot skip {} bytes, buffer length is {}", num_bytes,
-                                        *buf_len);
-        return false;
-    }
-
-    *buf += num_bytes;
-    *buf_len -= num_bytes;
-    return true;
-}
-
-inline bool ReadWriteUtil::is_negative_vint(int8_t byte) {
-    return byte < -120 || (byte >= -112 && byte < 0);
-}
-
-inline int ReadWriteUtil::decode_vint_size(int8_t byte) {
-    if (byte >= -112) {
-        return 1;
-    } else if (byte < -120) {
-        return -119 - byte;
-    }
-
-    return -111 - byte;
-}
-
-} // namespace doris
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 4cc2480854..4cc7888b27 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -23,10 +23,8 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exprs")
 
 add_library(Exprs
   encryption_functions.cpp
-  aggregate_functions.cpp
   anyval_util.cpp
   cast_functions.cpp
-  time_operators.cpp
   hash_functions.cpp
   block_bloom_filter_avx_impl.cc
   block_bloom_filter_impl.cc
diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
deleted file mode 100644
index b84bbec8fa..0000000000
--- a/be/src/exprs/aggregate_functions.cpp
+++ /dev/null
@@ -1,3089 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exprs/aggregate-functions.cpp
-// and modified by Doris
-
-// include aggregate_functions.h first to make sure that all need includes is written in header files
-#include "exprs/aggregate_functions.h"
-
-#include <math.h>
-
-#include <sstream>
-#include <unordered_set>
-
-#include "common/logging.h"
-#include "exprs/anyval_util.h"
-#include "exprs/hybrid_set.h"
-#include "olap/hll.h"
-#include "runtime/datetime_value.h"
-#include "runtime/decimalv2_value.h"
-#include "udf/udf_internal.h"
-#include "util/counts.h"
-#include "util/tdigest.h"
-#include "vec/common/string_ref.h"
-
-// TODO: this file should be cross compiled and then all of the builtin
-// aggregate functions will have a codegen enabled path. Then we can remove
-// the custom code in aggregation node.
-namespace doris {
-using doris_udf::FunctionContext;
-using doris_udf::BooleanVal;
-using doris_udf::TinyIntVal;
-using doris_udf::SmallIntVal;
-using doris_udf::IntVal;
-using doris_udf::BigIntVal;
-using doris_udf::LargeIntVal;
-using doris_udf::FloatVal;
-using doris_udf::DoubleVal;
-using doris_udf::DecimalV2Val;
-using doris_udf::DateTimeVal;
-using doris_udf::StringVal;
-using doris_udf::AnyVal;
-
-// Delimiter to use if the separator is nullptr.
-static const StringVal DEFAULT_STRING_CONCAT_DELIM((uint8_t*)", ", 2);
-
-void AggregateFunctions::init_null(FunctionContext*, AnyVal* dst) {
-    dst->is_null = true;
-}
-
-template <typename T>
-void AggregateFunctions::init_zero_not_null(FunctionContext*, T* dst) {
-    dst->is_null = false;
-    dst->val = 0;
-}
-
-template <>
-void AggregateFunctions::init_zero_not_null(FunctionContext*, DecimalV2Val* dst) {
-    dst->is_null = false;
-    dst->set_to_zero();
-}
-
-template <typename T>
-void AggregateFunctions::init_zero(FunctionContext*, T* dst) {
-    dst->is_null = false;
-    dst->val = 0;
-}
-
-template <>
-void AggregateFunctions::init_zero(FunctionContext*, DecimalV2Val* dst) {
-    dst->is_null = false;
-    dst->set_to_zero();
-}
-
-template <typename T>
-void AggregateFunctions::init_zero_null(FunctionContext*, T* dst) {
-    dst->is_null = true;
-    dst->val = 0;
-}
-
-template <>
-void AggregateFunctions::init_zero_null(FunctionContext*, DecimalV2Val* dst) {
-    dst->is_null = true;
-    dst->set_to_zero();
-}
-
-template <typename SRC_VAL, typename DST_VAL>
-void AggregateFunctions::sum_remove(FunctionContext* ctx, const SRC_VAL& src, DST_VAL* dst) {
-    // Do not count null values towards the number of removes
-    if (src.is_null) {
-        ctx->impl()->increment_num_removes(-1);
-    }
-    if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
-        *dst = DST_VAL::null();
-        return;
-    }
-    if (src.is_null) {
-        return;
-    }
-    if (dst->is_null) {
-        init_zero_not_null<DST_VAL>(ctx, dst);
-    }
-    dst->val -= src.val;
-}
-
-template <>
-void AggregateFunctions::sum_remove(FunctionContext* ctx, const DecimalV2Val& src,
-                                    DecimalV2Val* dst) {
-    if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
-        *dst = DecimalV2Val::null();
-        return;
-    }
-    if (src.is_null) {
-        return;
-    }
-    if (dst->is_null) {
-        init_zero_not_null<DecimalV2Val>(ctx, dst);
-    }
-
-    DecimalV2Value new_src = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value new_dst = DecimalV2Value::from_decimal_val(*dst);
-    new_dst = new_dst - new_src;
-    new_dst.to_decimal_val(dst);
-}
-
-StringVal AggregateFunctions::string_val_get_value(FunctionContext* ctx, const StringVal& src) {
-    if (src.is_null) {
-        return src;
-    }
-    StringVal result(ctx, src.len);
-    memcpy(result.ptr, src.ptr, src.len);
-    return result;
-}
-
-StringVal AggregateFunctions::string_val_serialize_or_finalize(FunctionContext* ctx,
-                                                               const StringVal& src) {
-    StringVal result = string_val_get_value(ctx, src);
-    if (!src.is_null) {
-        ctx->free(src.ptr);
-    }
-    return result;
-}
-
-void AggregateFunctions::count_update(FunctionContext*, const AnyVal& src, BigIntVal* dst) {
-    DCHECK(!dst->is_null);
-
-    if (!src.is_null) {
-        ++dst->val;
-    }
-}
-
-void AggregateFunctions::count_merge(FunctionContext*, const BigIntVal& src, BigIntVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    dst->val += src.val;
-}
-
-void AggregateFunctions::count_remove(FunctionContext*, const AnyVal& src, BigIntVal* dst) {
-    DCHECK(!dst->is_null);
-    if (!src.is_null) {
-        --dst->val;
-        DCHECK_GE(dst->val, 0);
-    }
-}
-
-struct PercentileState {
-    Counts counts;
-    double quantile = -1.0;
-};
-
-void AggregateFunctions::percentile_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(PercentileState);
-    dst->ptr = (uint8_t*)new PercentileState();
-}
-
-template <typename T>
-void AggregateFunctions::percentile_update(FunctionContext* ctx, const T& src,
-                                           const DoubleVal& quantile, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(PercentileState), dst->len);
-
-    PercentileState* percentile = reinterpret_cast<PercentileState*>(dst->ptr);
-    percentile->counts.increment(src.val, 1);
-    percentile->quantile = quantile.val;
-}
-
-void AggregateFunctions::percentile_merge(FunctionContext* ctx, const StringVal& src,
-                                          StringVal* dst) {
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(PercentileState), dst->len);
-
-    double quantile;
-    memcpy(&quantile, src.ptr, sizeof(double));
-
-    PercentileState* src_percentile = new PercentileState();
-    src_percentile->quantile = quantile;
-    src_percentile->counts.unserialize(src.ptr + sizeof(double));
-
-    PercentileState* dst_percentile = reinterpret_cast<PercentileState*>(dst->ptr);
-    dst_percentile->counts.merge(&src_percentile->counts);
-    if (dst_percentile->quantile == -1.0) {
-        dst_percentile->quantile = quantile;
-    }
-
-    delete src_percentile;
-}
-
-StringVal AggregateFunctions::percentile_serialize(FunctionContext* ctx, const StringVal& src) {
-    DCHECK(!src.is_null);
-
-    PercentileState* percentile = reinterpret_cast<PercentileState*>(src.ptr);
-    uint32_t serialize_size = percentile->counts.serialized_size();
-    StringVal result(ctx, sizeof(double) + serialize_size);
-    memcpy(result.ptr, &percentile->quantile, sizeof(double));
-    percentile->counts.serialize(result.ptr + sizeof(double));
-
-    delete percentile;
-    return result;
-}
-
-DoubleVal AggregateFunctions::percentile_finalize(FunctionContext* ctx, const StringVal& src) {
-    PercentileState* percentile = reinterpret_cast<PercentileState*>(src.ptr);
-    double quantile = percentile->quantile;
-    auto result = percentile->counts.terminate(quantile);
-
-    delete percentile;
-    return result;
-}
-
-struct PercentileApproxState {
-public:
-    PercentileApproxState() : digest(new TDigest()) {}
-    PercentileApproxState(double compression) : digest(new TDigest(compression)) {}
-    ~PercentileApproxState() { delete digest; }
-    static constexpr double INIT_QUANTILE = -1.0;
-
-    TDigest* digest = nullptr;
-    double targetQuantile = INIT_QUANTILE;
-};
-
-void AggregateFunctions::percentile_approx_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(PercentileApproxState);
-    const AnyVal* digest_compression = ctx->get_constant_arg(2);
-    if (digest_compression != nullptr) {
-        double compression = reinterpret_cast<const DoubleVal*>(digest_compression)->val;
-        if (compression >= 2048 && compression <= 10000) {
-            dst->ptr = (uint8_t*)new PercentileApproxState(compression);
-            return;
-        }
-    }
-
-    dst->ptr = (uint8_t*)new PercentileApproxState();
-};
-
-template <typename T>
-void AggregateFunctions::percentile_approx_update(FunctionContext* ctx, const T& src,
-                                                  const DoubleVal& quantile, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(PercentileApproxState), dst->len);
-
-    PercentileApproxState* percentile = reinterpret_cast<PercentileApproxState*>(dst->ptr);
-    percentile->digest->add(src.val);
-    percentile->targetQuantile = quantile.val;
-}
-
-template <typename T>
-void AggregateFunctions::percentile_approx_update(FunctionContext* ctx, const T& src,
-                                                  const DoubleVal& quantile,
-                                                  const DoubleVal& digest_compression,
-                                                  StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(PercentileApproxState), dst->len);
-
-    PercentileApproxState* percentile = reinterpret_cast<PercentileApproxState*>(dst->ptr);
-    percentile->digest->add(src.val);
-    percentile->targetQuantile = quantile.val;
-}
-
-StringVal AggregateFunctions::percentile_approx_serialize(FunctionContext* ctx,
-                                                          const StringVal& src) {
-    DCHECK(!src.is_null);
-
-    PercentileApproxState* percentile = reinterpret_cast<PercentileApproxState*>(src.ptr);
-    uint32_t serialized_size = percentile->digest->serialized_size();
-    StringVal result(ctx, sizeof(double) + serialized_size);
-    memcpy(result.ptr, &percentile->targetQuantile, sizeof(double));
-    percentile->digest->serialize(result.ptr + sizeof(double));
-
-    delete percentile;
-    return result;
-}
-
-void AggregateFunctions::percentile_approx_merge(FunctionContext* ctx, const StringVal& src,
-                                                 StringVal* dst) {
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(PercentileApproxState), dst->len);
-
-    double quantile;
-    memcpy(&quantile, src.ptr, sizeof(double));
-
-    PercentileApproxState* src_percentile = new PercentileApproxState();
-    src_percentile->targetQuantile = quantile;
-    src_percentile->digest->unserialize(src.ptr + sizeof(double));
-
-    PercentileApproxState* dst_percentile = reinterpret_cast<PercentileApproxState*>(dst->ptr);
-    dst_percentile->digest->merge(src_percentile->digest);
-    // dst_percentile->targetQuantile only need set once from child result
-    // for example:
-    //    child result targetQuantile is (0.5, -1), we should set 0.5 once to make sure correct result
-    if (dst_percentile->targetQuantile == PercentileApproxState::INIT_QUANTILE) {
-        dst_percentile->targetQuantile = quantile;
-    }
-
-    delete src_percentile;
-}
-
-DoubleVal AggregateFunctions::percentile_approx_finalize(FunctionContext* ctx,
-                                                         const StringVal& src) {
-    PercentileApproxState* percentile = reinterpret_cast<PercentileApproxState*>(src.ptr);
-    double quantile = percentile->targetQuantile;
-    double result = percentile->digest->quantile(quantile);
-
-    delete percentile;
-    if (isnan(result)) {
-        return DoubleVal(result).null();
-    } else {
-        return DoubleVal(result);
-    }
-}
-
-struct AvgState {
-    double sum = 0;
-    int64_t count = 0;
-};
-
-struct DecimalV2AvgState {
-    DecimalV2Val sum;
-    int64_t count = 0;
-};
-
-void AggregateFunctions::avg_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(AvgState);
-    dst->ptr = ctx->allocate(dst->len);
-    new (dst->ptr) AvgState;
-}
-
-void AggregateFunctions::decimalv2_avg_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(DecimalV2AvgState);
-    // The memory for int128 need to be aligned by 16.
-    // So the constructor has been used instead of allocating memory.
-    // Also, it will be release in finalize.
-    dst->ptr = (uint8_t*)new DecimalV2AvgState;
-}
-
-template <typename T>
-void AggregateFunctions::avg_update(FunctionContext* ctx, const T& src, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(AvgState), dst->len);
-    AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
-    avg->sum += src.val;
-    ++avg->count;
-}
-
-void AggregateFunctions::decimalv2_avg_update(FunctionContext* ctx, const DecimalV2Val& src,
-                                              StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(DecimalV2AvgState), dst->len);
-    DecimalV2AvgState* avg = reinterpret_cast<DecimalV2AvgState*>(dst->ptr);
-
-    DecimalV2Value v1 = DecimalV2Value::from_decimal_val(avg->sum);
-    DecimalV2Value v2 = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value v = v1 + v2;
-    v.to_decimal_val(&avg->sum);
-
-    ++avg->count;
-}
-
-StringVal AggregateFunctions::decimalv2_avg_serialize(FunctionContext* ctx, const StringVal& src) {
-    DCHECK(!src.is_null);
-    StringVal result(ctx, src.len);
-    memcpy(result.ptr, src.ptr, src.len);
-    delete (DecimalV2AvgState*)src.ptr;
-    return result;
-}
-
-template <typename T>
-void AggregateFunctions::avg_remove(FunctionContext* ctx, const T& src, StringVal* dst) {
-    // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
-    // because Finalize() returns nullptr if count is 0.
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(AvgState), dst->len);
-    AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
-    avg->sum -= src.val;
-    --avg->count;
-    DCHECK_GE(avg->count, 0);
-}
-
-void AggregateFunctions::decimalv2_avg_remove(doris_udf::FunctionContext* ctx,
-                                              const DecimalV2Val& src, StringVal* dst) {
-    // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
-    // because Finalize() returns nullptr if count is 0.
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(DecimalV2AvgState), dst->len);
-    DecimalV2AvgState* avg = reinterpret_cast<DecimalV2AvgState*>(dst->ptr);
-
-    DecimalV2Value v1 = DecimalV2Value::from_decimal_val(avg->sum);
-    DecimalV2Value v2 = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value v = v1 - v2;
-    v.to_decimal_val(&avg->sum);
-
-    --avg->count;
-    DCHECK_GE(avg->count, 0);
-}
-
-void AggregateFunctions::avg_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    const AvgState* src_struct = reinterpret_cast<const AvgState*>(src.ptr);
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(AvgState), dst->len);
-    AvgState* dst_struct = reinterpret_cast<AvgState*>(dst->ptr);
-    dst_struct->sum += src_struct->sum;
-    dst_struct->count += src_struct->count;
-}
-
-void AggregateFunctions::decimalv2_avg_merge(FunctionContext* ctx, const StringVal& src,
-                                             StringVal* dst) {
-    DecimalV2AvgState src_struct;
-    memcpy(&src_struct, src.ptr, sizeof(DecimalV2AvgState));
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(DecimalV2AvgState), dst->len);
-    DecimalV2AvgState* dst_struct = reinterpret_cast<DecimalV2AvgState*>(dst->ptr);
-
-    DecimalV2Value v1 = DecimalV2Value::from_decimal_val(dst_struct->sum);
-    DecimalV2Value v2 = DecimalV2Value::from_decimal_val(src_struct.sum);
-    DecimalV2Value v = v1 + v2;
-    v.to_decimal_val(&dst_struct->sum);
-    dst_struct->count += src_struct.count;
-}
-
-DoubleVal AggregateFunctions::avg_get_value(FunctionContext* ctx, const StringVal& src) {
-    AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
-    if (val_struct->count == 0) {
-        return DoubleVal::null();
-    }
-    return DoubleVal(val_struct->sum / val_struct->count);
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_avg_get_value(FunctionContext* ctx,
-                                                         const StringVal& src) {
-    DecimalV2AvgState* val_struct = reinterpret_cast<DecimalV2AvgState*>(src.ptr);
-    if (val_struct->count == 0) {
-        return DecimalV2Val::null();
-    }
-    DecimalV2Value v1 = DecimalV2Value::from_decimal_val(val_struct->sum);
-    DecimalV2Value v = v1 / DecimalV2Value(val_struct->count, 0);
-    DecimalV2Val res;
-    v.to_decimal_val(&res);
-
-    return res;
-}
-
-DoubleVal AggregateFunctions::avg_finalize(FunctionContext* ctx, const StringVal& src) {
-    if (src.is_null) {
-        return DoubleVal::null();
-    }
-    DoubleVal result = avg_get_value(ctx, src);
-    ctx->free(src.ptr);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_avg_finalize(FunctionContext* ctx,
-                                                        const StringVal& src) {
-    DecimalV2Val result = decimalv2_avg_get_value(ctx, src);
-    delete (DecimalV2AvgState*)src.ptr;
-    return result;
-}
-
-void AggregateFunctions::timestamp_avg_update(FunctionContext* ctx, const DateTimeVal& src,
-                                              StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(AvgState), dst->len);
-    AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
-    double val = DateTimeValue::from_datetime_val(src);
-    avg->sum += val;
-    ++avg->count;
-}
-
-void AggregateFunctions::timestamp_avg_remove(FunctionContext* ctx, const DateTimeVal& src,
-                                              StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(AvgState), dst->len);
-    AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
-    double val = DateTimeValue::from_datetime_val(src);
-    avg->sum -= val;
-    --avg->count;
-    DCHECK_GE(avg->count, 0);
-}
-
-DateTimeVal AggregateFunctions::timestamp_avg_get_value(FunctionContext* ctx,
-                                                        const StringVal& src) {
-    AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
-    if (val_struct->count == 0) {
-        return DateTimeVal::null();
-    }
-    DateTimeValue tv(val_struct->sum / val_struct->count);
-    DateTimeVal result;
-    tv.to_datetime_val(&result);
-    return result;
-}
-
-DateTimeVal AggregateFunctions::timestamp_avg_finalize(FunctionContext* ctx, const StringVal& src) {
-    if (src.is_null) {
-        return DateTimeVal::null();
-    }
-    DateTimeVal result = timestamp_avg_get_value(ctx, src);
-    ctx->free(src.ptr);
-    return result;
-}
-
-void AggregateFunctions::count_star_update(FunctionContext*, BigIntVal* dst) {
-    DCHECK(!dst->is_null);
-    ++dst->val;
-}
-
-void AggregateFunctions::count_star_remove(FunctionContext*, BigIntVal* dst) {
-    DCHECK(!dst->is_null);
-    --dst->val;
-    DCHECK_GE(dst->val, 0);
-}
-
-template <typename SRC_VAL, typename DST_VAL>
-void AggregateFunctions::sum(FunctionContext* ctx, const SRC_VAL& src, DST_VAL* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        init_zero_not_null<DST_VAL>(ctx, dst);
-    }
-    dst->val += src.val;
-}
-
-template <>
-void AggregateFunctions::sum(FunctionContext* ctx, const DecimalV2Val& src, DecimalV2Val* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        init_zero_not_null<DecimalV2Val>(ctx, dst);
-    }
-    DecimalV2Value new_src = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value new_dst = DecimalV2Value::from_decimal_val(*dst);
-    new_dst = new_dst + new_src;
-    new_dst.to_decimal_val(dst);
-}
-
-template <typename T>
-void AggregateFunctions::min_init(FunctionContext* ctx, T* dst) {
-    auto val = AnyValUtil::max_val<T>(ctx);
-    // set to null when intermediate slot is nullable
-    val.is_null = true;
-    *dst = val;
-}
-
-template <typename T>
-void AggregateFunctions::min(FunctionContext*, const T& src, T* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null || src.val < dst->val) {
-        *dst = src;
-    }
-}
-
-template <typename T>
-void AggregateFunctions::max_init(FunctionContext* ctx, T* dst) {
-    auto val = AnyValUtil::min_val<T>(ctx);
-    // set to null when intermediate slot is nullable
-    val.is_null = true;
-    *dst = val;
-}
-
-template <typename T>
-void AggregateFunctions::max(FunctionContext*, const T& src, T* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null || src.val > dst->val) {
-        *dst = src;
-    }
-}
-
-template <typename T>
-void AggregateFunctions::any_init(FunctionContext* ctx, T* dst) {
-    T val {};
-    // set to null when intermediate slot is nullable
-    val.is_null = true;
-    *dst = val;
-}
-
-template <typename T>
-void AggregateFunctions::any(FunctionContext*, const T& src, T* dst) {
-    if (LIKELY(!dst->is_null || src.is_null)) {
-        return;
-    }
-
-    *dst = src;
-}
-
-template <>
-void AggregateFunctions::min(FunctionContext*, const DecimalV2Val& src, DecimalV2Val* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-    } else {
-        DecimalV2Value new_src = DecimalV2Value::from_decimal_val(src);
-        DecimalV2Value new_dst = DecimalV2Value::from_decimal_val(*dst);
-
-        if (new_src < new_dst) {
-            *dst = src;
-        }
-    }
-}
-
-template <>
-void AggregateFunctions::max(FunctionContext*, const DecimalV2Val& src, DecimalV2Val* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-    } else {
-        DecimalV2Value new_src = DecimalV2Value::from_decimal_val(src);
-        DecimalV2Value new_dst = DecimalV2Value::from_decimal_val(*dst);
-
-        if (new_src > new_dst) {
-            *dst = src;
-        }
-    }
-}
-
-void AggregateFunctions::init_null_string(FunctionContext* c, StringVal* dst) {
-    dst->is_null = true;
-    dst->ptr = nullptr;
-    dst->len = 0;
-}
-
-template <>
-void AggregateFunctions::min(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null || StringRef(src) < StringRef(*dst)) {
-        if (!dst->is_null) {
-            ctx->free(dst->ptr);
-        }
-        uint8_t* copy = ctx->allocate(src.len);
-        memcpy(copy, src.ptr, src.len);
-        *dst = StringVal(copy, src.len);
-    }
-}
-
-template <>
-void AggregateFunctions::max(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null || StringRef(src) > StringRef(*dst)) {
-        if (!dst->is_null) {
-            ctx->free(dst->ptr);
-        }
-        uint8_t* copy = ctx->allocate(src.len);
-        memcpy(copy, src.ptr, src.len);
-        *dst = StringVal(copy, src.len);
-    }
-}
-
-template <>
-void AggregateFunctions::any(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    if (LIKELY(src.is_null || !dst->is_null)) {
-        return;
-    }
-
-    uint8_t* copy = ctx->allocate(src.len);
-    memcpy(copy, src.ptr, src.len);
-    *dst = StringVal(copy, src.len);
-}
-
-template <>
-void AggregateFunctions::min(FunctionContext*, const DateTimeVal& src, DateTimeVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-        return;
-    }
-
-    DateTimeValue src_tv = DateTimeValue::from_datetime_val(src);
-    DateTimeValue dst_tv = DateTimeValue::from_datetime_val(*dst);
-
-    if (src_tv < dst_tv) {
-        *dst = src;
-    }
-}
-
-template <>
-void AggregateFunctions::max(FunctionContext*, const DateTimeVal& src, DateTimeVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-        return;
-    }
-
-    DateTimeValue src_tv = DateTimeValue::from_datetime_val(src);
-    DateTimeValue dst_tv = DateTimeValue::from_datetime_val(*dst);
-
-    if (src_tv > dst_tv) {
-        *dst = src;
-    }
-}
-
-void AggregateFunctions::string_concat(FunctionContext* ctx, const StringVal& src,
-                                       const StringVal& separator, StringVal* result) {
-    if (src.is_null || separator.is_null) {
-        return;
-    }
-
-    if (result->is_null) {
-        uint8_t* copy = ctx->allocate(src.len);
-        memcpy(copy, src.ptr, src.len);
-        *result = StringVal(copy, src.len);
-        return;
-    }
-
-    const StringVal* sep_ptr = separator.is_null ? &DEFAULT_STRING_CONCAT_DELIM : &separator;
-
-    int new_size = result->len + sep_ptr->len + src.len;
-    result->ptr = ctx->reallocate(result->ptr, new_size);
-    memcpy(result->ptr + result->len, sep_ptr->ptr, sep_ptr->len);
-    result->len += sep_ptr->len;
-    memcpy(result->ptr + result->len, src.ptr, src.len);
-    result->len += src.len;
-}
-
-// StringConcat intermediate state starts with the length of the first
-// separator, followed by the accumulated string.  The accumulated
-// string starts with the separator of the first value that arrived in
-// StringConcatUpdate().
-using StringConcatHeader = int64_t;
-// Delimiter to use if the separator is nullptr.
-
-void AggregateFunctions::string_concat_update(FunctionContext* ctx, const StringVal& src,
-                                              StringVal* result) {
-    string_concat_update(ctx, src, DEFAULT_STRING_CONCAT_DELIM, result);
-}
-
-void AggregateFunctions::string_concat_update(FunctionContext* ctx, const StringVal& src,
-                                              const StringVal& separator, StringVal* result) {
-    if (src.is_null || separator.is_null) {
-        return;
-    }
-    const StringVal* sep = separator.is_null ? &DEFAULT_STRING_CONCAT_DELIM : &separator;
-    if (result->is_null || !result->ptr) {
-        // Header of the intermediate state holds the length of the first separator.
-        const auto header_len = sizeof(StringConcatHeader);
-        DCHECK(header_len == sizeof(sep->len));
-        *result = StringVal(ctx->allocate(header_len), header_len);
-        if (result->ptr) {
-            *reinterpret_cast<StringConcatHeader*>(result->ptr) = sep->len;
-        }
-    }
-    result->append(ctx, sep->ptr, sep->len, src.ptr, src.len);
-}
-
-void AggregateFunctions::string_concat_merge(FunctionContext* ctx, const StringVal& src,
-                                             StringVal* result) {
-    if (src.is_null) {
-        return;
-    }
-    const auto header_len = sizeof(StringConcatHeader);
-    if (result->is_null || !result->ptr) {
-        // Copy the header from the first intermediate value.
-        *result = StringVal(ctx->allocate(header_len), header_len);
-        if (result->is_null) {
-            return;
-        }
-        *reinterpret_cast<StringConcatHeader*>(result->ptr) =
-                *reinterpret_cast<StringConcatHeader*>(src.ptr);
-    }
-    // Append the string portion of the intermediate src to result (omit src's header).
-    result->append(ctx, src.ptr + header_len, src.len - header_len);
-}
-
-StringVal AggregateFunctions::string_concat_finalize(FunctionContext* ctx, const StringVal& src) {
-    if (src.is_null) {
-        return src;
-    }
-    const auto header_len = sizeof(StringConcatHeader);
-    DCHECK(src.len >= header_len);
-    int sep_len = *reinterpret_cast<StringConcatHeader*>(src.ptr);
-    DCHECK(src.len >= header_len + sep_len);
-    // Remove the header and the first separator.
-    StringVal result = StringVal::copy_from(ctx, src.ptr + header_len + sep_len,
-                                            src.len - header_len - sep_len);
-    ctx->free(src.ptr);
-    return result;
-}
-
-// Compute distinctpc and distinctpcsa using Flajolet and Martin's algorithm
-// (Probabilistic Counting Algorithms for Data Base Applications)
-// We have implemented two variants here: one with stochastic averaging (with PCSA
-// postfix) and one without.
-// There are 4 phases to compute the aggregate:
-//   1. allocate a bitmap, stored in the aggregation tuple's output string slot
-//   2. update the bitmap per row (UpdateDistinctEstimateSlot)
-//   3. for distributed plan, merge the bitmaps from all the nodes
-//      (UpdateMergeEstimateSlot)
-//   4. compute the estimate using the bitmaps when all the rows are processed
-//      (FinalizeEstimateSlot)
-const static int NUM_PC_BITMAPS = 64;   // number of bitmaps
-const static int PC_BITMAP_LENGTH = 32; // the length of each bit map
-const static float PC_THETA = 0.77351f; // the magic number to compute the final result
-
-void AggregateFunctions::pc_init(FunctionContext* c, StringVal* dst) {
-    // Initialize the distinct estimate bit map - Probabilistic Counting Algorithms for Data
-    // Base Applications (Flajolet and Martin)
-    //
-    // The bitmap is a 64bit(1st index) x 32bit(2nd index) matrix.
-    // So, the string length of 256 byte is enough.
-    // The layout is:
-    //   row  1: 8bit 8bit 8bit 8bit
-    //   row  2: 8bit 8bit 8bit 8bit
-    //   ...     ..
-    //   ...     ..
-    //   row 64: 8bit 8bit 8bit 8bit
-    //
-    // Using 32bit length, we can count up to 10^8. This will not be enough for Fact table
-    // primary key, but once we approach the limit, we could interpret the result as
-    // "every row is distinct".
-    //
-    // We use "string" type for DISTINCT_PC function so that we can use the string
-    // slot to hold the bitmaps.
-    dst->is_null = false;
-    int str_len = NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8;
-    dst->ptr = c->allocate(str_len);
-    dst->len = str_len;
-    memset(dst->ptr, 0, str_len);
-}
-
-static inline void set_distinct_estimate_bit(uint8_t* bitmap, uint32_t row_index,
-                                             uint32_t bit_index) {
-    // We need to convert Bitmap[alpha,index] into the index of the string.
-    // alpha tells which of the 32bit we've to jump to.
-    // index then lead us to the byte and bit.
-    uint32_t* int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
-    int_bitmap[row_index] |= (1 << bit_index);
-}
-
-static inline bool get_distinct_estimate_bit(uint8_t* bitmap, uint32_t row_index,
-                                             uint32_t bit_index) {
-    uint32_t* int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
-    return ((int_bitmap[row_index] & (1 << bit_index)) > 0);
-}
-
-template <typename T>
-void AggregateFunctions::pc_update(FunctionContext* c, const T& input, StringVal* dst) {
-    if (input.is_null) {
-        return;
-    }
-
-    // Core of the algorithm. This is a direct translation of the code in the paper.
-    // Please see the paper for details. For simple averaging, we need to compute hash
-    // values NUM_PC_BITMAPS times using NUM_PC_BITMAPS different hash functions (by using a
-    // different seed).
-    for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
-        uint32_t hash_value = AnyValUtil::hash(input, i);
-        int bit_index = __builtin_ctz(hash_value);
-
-        if (UNLIKELY(hash_value == 0)) {
-            bit_index = PC_BITMAP_LENGTH - 1;
-        }
-
-        // Set bitmap[i, bit_index] to 1
-        set_distinct_estimate_bit(dst->ptr, i, bit_index);
-    }
-}
-
-template <typename T>
-void AggregateFunctions::pcsa_update(FunctionContext* c, const T& input, StringVal* dst) {
-    if (input.is_null) {
-        return;
-    }
-
-    // Core of the algorithm. This is a direct translation of the code in the paper.
-    // Please see the paper for details. Using stochastic averaging, we only need to
-    // the hash value once for each row.
-    uint32_t hash_value = AnyValUtil::hash(input, 0);
-    uint32_t row_index = hash_value % NUM_PC_BITMAPS;
-
-    // We want the zero-based position of the least significant 1-bit in binary
-    // representation of hash_value. __builtin_ctz does exactly this because it returns
-    // the number of trailing 0-bits in x (or undefined if x is zero).
-    int bit_index = __builtin_ctz(hash_value / NUM_PC_BITMAPS);
-
-    if (UNLIKELY(hash_value == 0)) {
-        bit_index = PC_BITMAP_LENGTH - 1;
-    }
-
-    // Set bitmap[row_index, bit_index] to 1
-    set_distinct_estimate_bit(dst->ptr, row_index, bit_index);
-}
-
-std::string distinct_estimate_bitmap_to_string(uint8_t* v) {
-    std::stringstream debugstr;
-
-    for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
-        for (int j = 0; j < PC_BITMAP_LENGTH; ++j) {
-            // print bitmap[i][j]
-            debugstr << get_distinct_estimate_bit(v, i, j);
-        }
-
-        debugstr << "\n";
-    }
-
-    debugstr << "\n";
-    return debugstr.str();
-}
-
-void AggregateFunctions::pc_merge(FunctionContext* c, const StringVal& src, StringVal* dst) {
-    DCHECK(!src.is_null);
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(src.len, NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8);
-
-    // Merge the bits
-    // I think _mm_or_ps can do it, but perf doesn't really matter here. We call this only
-    // once group per node.
-    for (int i = 0; i < NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8; ++i) {
-        *(dst->ptr + i) |= *(src.ptr + i);
-    }
-
-    VLOG_ROW << "UpdateMergeEstimateSlot Src Bit map:\n"
-             << distinct_estimate_bitmap_to_string(src.ptr);
-    VLOG_ROW << "UpdateMergeEstimateSlot Dst Bit map:\n"
-             << distinct_estimate_bitmap_to_string(dst->ptr);
-}
-
-double distinct_estimate_finalize(const StringVal& src) {
-    DCHECK(!src.is_null);
-    DCHECK_EQ(src.len, NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8);
-    VLOG_ROW << "FinalizeEstimateSlot Bit map:\n" << distinct_estimate_bitmap_to_string(src.ptr);
-
-    // We haven't processed any rows if none of the bits are set. Therefore, we have zero
-    // distinct rows. We're overwriting the result in the same string buffer we've
-    // allocated.
-    bool is_empty = true;
-
-    for (int i = 0; i < NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8; ++i) {
-        if (src.ptr[i] != 0) {
-            is_empty = false;
-            break;
-        }
-    }
-
-    if (is_empty) {
-        return 0;
-    }
-
-    // Convert the bitmap to a number, please see the paper for details
-    // In short, we count the average number of leading 1s (per row) in the bit map.
-    // The number is proportional to the log2(1/NUM_PC_BITMAPS of  the actual number of
-    // distinct).
-    // To get the actual number of distinct, we'll do 2^avg / PC_THETA.
-    // PC_THETA is a magic number.
-    int sum = 0;
-
-    for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
-        int row_bit_count = 0;
-
-        // Count the number of leading ones for each row in the bitmap
-        // We could have used the build in __builtin_clz to count of number of leading zeros
-        // but we first need to invert the 1 and 0.
-        while (get_distinct_estimate_bit(src.ptr, i, row_bit_count) &&
-               row_bit_count < PC_BITMAP_LENGTH) {
-            ++row_bit_count;
-        }
-
-        sum += row_bit_count;
-    }
-
-    double avg = static_cast<double>(sum) / static_cast<double>(NUM_PC_BITMAPS);
-    double result = std::pow(static_cast<double>(2), avg) / PC_THETA;
-    return result;
-}
-
-StringVal AggregateFunctions::pc_finalize(FunctionContext* c, const StringVal& src) {
-    double estimate = distinct_estimate_finalize(src);
-    int64_t result = estimate;
-    // TODO: this should return bigint. this is a hack
-    std::stringstream ss;
-    ss << result;
-    std::string str = ss.str();
-    StringVal dst = src;
-    memcpy(dst.ptr, str.c_str(), str.length());
-    dst.len = str.length();
-    return dst;
-}
-
-StringVal AggregateFunctions::pcsa_finalize(FunctionContext* c, const StringVal& src) {
-    // When using stochastic averaging, the result has to be multiplied by NUM_PC_BITMAPS.
-    double estimate = distinct_estimate_finalize(src) * NUM_PC_BITMAPS;
-    int64_t result = estimate;
-    // TODO: this should return bigint. this is a hack
-    std::stringstream ss;
-    ss << result;
-    std::string str = ss.str();
-    StringVal dst = src;
-    memcpy(dst.ptr, str.c_str(), str.length());
-    dst.len = str.length();
-    return dst;
-}
-
-void AggregateFunctions::hll_init(FunctionContext* ctx, StringVal* dst) {
-    int str_len = std::pow(2, HLL_COLUMN_PRECISION);
-    dst->is_null = false;
-    dst->ptr = ctx->allocate(str_len);
-    dst->len = str_len;
-    memset(dst->ptr, 0, str_len);
-}
-
-template <typename T>
-void AggregateFunctions::hll_update(FunctionContext* ctx, const T& src, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, HLL_REGISTERS_COUNT);
-    uint64_t hash_value = AnyValUtil::hash64_murmur(src, HashUtil::MURMUR_SEED);
-
-    if (hash_value != 0) {
-        int idx = hash_value % dst->len;
-        uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1;
-        dst->ptr[idx] = (dst->ptr[idx] < first_one_bit ? first_one_bit : dst->ptr[idx]);
-    }
-}
-
-void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION));
-    DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION));
-
-    for (int i = 0; i < src.len; ++i) {
-        dst->ptr[i] = (dst->ptr[i] < src.ptr[i] ? src.ptr[i] : dst->ptr[i]);
-    }
-}
-
-StringVal AggregateFunctions::hll_finalize(FunctionContext* ctx, const StringVal& src) {
-    double estimate = hll_algorithm(src);
-    // Output the estimate as ascii string
-    std::stringstream out;
-    out << (int64_t)estimate;
-    std::string out_str = out.str();
-    StringVal result_str(ctx, out_str.size());
-    memcpy(result_str.ptr, out_str.c_str(), result_str.len);
-    return result_str;
-}
-
-void AggregateFunctions::hll_union_agg_init(FunctionContext* ctx, HllVal* dst) {
-    dst->init(ctx);
-}
-
-void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx, const HllVal& src,
-                                              HllVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DCHECK(!dst->is_null);
-
-    dst->agg_parse_and_cal(ctx, src);
-    return;
-}
-
-void AggregateFunctions::hll_union_agg_merge(FunctionContext* ctx, const HllVal& src, HllVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    DCHECK_EQ(dst->len, HLL_COLUMN_DEFAULT_LEN);
-    DCHECK_EQ(src.len, HLL_COLUMN_DEFAULT_LEN);
-
-    dst->agg_merge(src);
-}
-
-doris_udf::BigIntVal AggregateFunctions::hll_union_agg_finalize(doris_udf::FunctionContext* ctx,
-                                                                const HllVal& src) {
-    double estimate = hll_algorithm(src);
-    BigIntVal result((int64_t)estimate);
-    return result;
-}
-
-int64_t AggregateFunctions::hll_algorithm(uint8_t* pdata, int data_len) {
-    DCHECK_EQ(data_len, HLL_REGISTERS_COUNT);
-
-    const int num_streams = HLL_REGISTERS_COUNT;
-    // Empirical constants for the algorithm.
-    float alpha = 0;
-
-    if (num_streams == 16) {
-        alpha = 0.673f;
-    } else if (num_streams == 32) {
-        alpha = 0.697f;
-    } else if (num_streams == 64) {
-        alpha = 0.709f;
-    } else {
-        alpha = 0.7213f / (1 + 1.079f / num_streams);
-    }
-
-    float harmonic_mean = 0;
-    int num_zero_registers = 0;
-
-    for (int i = 0; i < data_len; ++i) {
-        harmonic_mean += powf(2.0f, -pdata[i]);
-
-        if (pdata[i] == 0) {
-            ++num_zero_registers;
-        }
-    }
-
-    harmonic_mean = 1.0f / harmonic_mean;
-    double estimate = alpha * num_streams * num_streams * harmonic_mean;
-    // according to HyperLogLog current correction, if E is cardinal
-    // E =< num_streams * 2.5 , LC has higher accuracy.
-    // num_streams * 2.5 < E , HyperLogLog has higher accuracy.
-    // Generally , we can use HyperLogLog to produce value as E.
-    if (estimate <= num_streams * 2.5 && num_zero_registers != 0) {
-        // Estimated cardinality is too low. Hll is too inaccurate here, instead use
-        // linear counting.
-        estimate = num_streams * log(static_cast<float>(num_streams) / num_zero_registers);
-    } else if (num_streams == 16384 && estimate < 72000) {
-        // when Linear Count change to HyperLoglog according to HyperLogLog Correction,
-        // there are relatively large fluctuations, we fixed the problem refer to redis.
-        double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) -
-                      1.4253 * 1.0e-12 * (estimate * estimate * estimate) +
-                      1.2940 * 1.0e-7 * (estimate * estimate) - 5.2921 * 1.0e-3 * estimate +
-                      83.3216;
-        estimate -= estimate * (bias / 100);
-    }
-    return (int64_t)(estimate + 0.5);
-}
-
-void AggregateFunctions::hll_raw_agg_init(FunctionContext* ctx, HllVal* dst) {
-    hll_union_agg_init(ctx, dst);
-}
-
-void AggregateFunctions::hll_raw_agg_update(FunctionContext* ctx, const HllVal& src, HllVal* dst) {
-    hll_union_agg_update(ctx, src, dst);
-}
-
-void AggregateFunctions::hll_raw_agg_merge(FunctionContext* ctx, const HllVal& src, HllVal* dst) {
-    hll_union_agg_merge(ctx, src, dst);
-}
-
-doris_udf::HllVal AggregateFunctions::hll_raw_agg_finalize(doris_udf::FunctionContext* ctx,
-                                                           const HllVal& src) {
-    DCHECK(!src.is_null);
-    DCHECK_EQ(src.len, HLL_COLUMN_DEFAULT_LEN);
-
-    HllVal result;
-    result.init(ctx);
-    memcpy(result.ptr, src.ptr, src.len);
-    return result;
-}
-
-// TODO chenhao , reduce memory copy
-// multi distinct state for numeric
-// serialize order type:value:value:value ...
-template <typename T>
-class MultiDistinctNumericState {
-public:
-    static void create(StringVal* dst) {
-        dst->is_null = false;
-        const int state_size = sizeof(MultiDistinctNumericState<T>);
-        MultiDistinctNumericState<T>* state = new MultiDistinctNumericState<T>();
-        if (std::is_same<T, TinyIntVal>::value) {
-            state->_type = FunctionContext::TYPE_TINYINT;
-        } else if (std::is_same<T, SmallIntVal>::value) {
-            state->_type = FunctionContext::TYPE_SMALLINT;
-        } else if (std::is_same<T, IntVal>::value) {
-            state->_type = FunctionContext::TYPE_INT;
-        } else if (std::is_same<T, BigIntVal>::value) {
-            state->_type = FunctionContext::TYPE_BIGINT;
-        } else if (std::is_same<T, LargeIntVal>::value) {
-            state->_type = FunctionContext::TYPE_LARGEINT;
-        } else if (std::is_same<T, DoubleVal>::value) {
-            state->_type = FunctionContext::TYPE_DOUBLE;
-        } else if (std::is_same<T, FloatVal>::value) {
-            state->_type = FunctionContext::TYPE_FLOAT;
-        } else {
-            DCHECK(false);
-        }
-        dst->len = state_size;
-        dst->ptr = (uint8_t*)state;
-    }
-
-    static void destroy(const StringVal& dst) { delete (MultiDistinctNumericState<T>*)dst.ptr; }
-
-    void update(T& t) { _set.insert(t); }
-
-    // type:one byte  value:sizeof(T)
-    StringVal serialize(FunctionContext* ctx) {
-        size_t type_size = sizeof(((T*)0)->val);
-        const size_t serialized_set_length = sizeof(uint8_t) + type_size * _set.size();
-        StringVal result(ctx, serialized_set_length);
-        uint8_t* type_writer = result.ptr;
-        // type
-        *type_writer = (uint8_t)_type;
-        type_writer++;
-        // value
-        for (auto& value : _set) {
-            memcpy(type_writer, &value.val, type_size);
-            type_writer += type_size;
-        }
-        return result;
-    }
-
-    void unserialize(StringVal& src) {
-        size_t type_size = sizeof(((T*)0)->val);
-        const uint8_t* type_reader = src.ptr;
-        const uint8_t* end = src.ptr + src.len;
-        // type
-        _type = (FunctionContext::Type)*type_reader;
-        type_reader++;
-        // value
-        while (type_reader < end) {
-            T value;
-            value.is_null = false;
-            memcpy(&value.val, type_reader, type_size);
-            _set.insert(value);
-            type_reader += type_size;
-        }
-    }
-
-    // merge set
-    void merge(MultiDistinctNumericState& state) {
-        _set.insert(state._set.begin(), state._set.end());
-    }
-
-    // count
-    BigIntVal count_finalize() { return BigIntVal(_set.size()); }
-
-    // sum for double, decimal
-    DoubleVal sum_finalize_double() {
-        double sum = 0;
-        for (auto& value : _set) {
-            sum += value.val;
-        }
-        return DoubleVal(sum);
-    }
-
-    // sum for largeint
-    LargeIntVal sum_finalize_largeint() {
-        __int128 sum = 0;
-        for (auto& value : _set) {
-            sum += value.val;
-        }
-        return LargeIntVal(sum);
-    }
-
-    // sum for tinyint, smallint, int, bigint
-    BigIntVal sum_finalize_bigint() {
-        int64_t sum = 0;
-        for (auto& value : _set) {
-            sum += value.val;
-        }
-        return BigIntVal(sum);
-    }
-
-    FunctionContext::Type set_type() { return _type; }
-
-private:
-    class NumericHashHelper {
-    public:
-        size_t operator()(const T& obj) const {
-            size_t result = AnyValUtil::hash64_murmur(obj, HashUtil::MURMUR_SEED);
-            return result;
-        }
-    };
-
-    phmap::flat_hash_set<T, NumericHashHelper> _set;
-
-    // Because Anyval does not provide the hash function, in order
-    // to adopt the type different from the template, the pointer is used
-    // HybridSetBase* _set;
-    // _type is serialized into buffer by one byte
-    FunctionContext::Type _type;
-};
-
-// multi distinct state for string
-// serialize order type:len:value:len:value ...
-class MultiDistinctStringCountState {
-public:
-    static void create(StringVal* dst) {
-        dst->is_null = false;
-        const int state_size = sizeof(MultiDistinctStringCountState);
-        MultiDistinctStringCountState* state = new MultiDistinctStringCountState();
-        // type length
-        state->_type = FunctionContext::TYPE_STRING;
-        dst->len = state_size;
-        dst->ptr = (uint8_t*)state;
-    }
-
-    static void destroy(const StringVal& dst) { delete (MultiDistinctStringCountState*)dst.ptr; }
-
-    void update(StringRef* sv) { _set.insert(sv); }
-
-    StringVal serialize(FunctionContext* ctx) {
-        // calculate total serialize buffer length
-        int total_serialized_set_length = 1;
-        HybridSetBase::IteratorBase* iterator = _set.begin();
-        while (iterator->has_next()) {
-            const StringRef* value = reinterpret_cast<const StringRef*>(iterator->get_value());
-            total_serialized_set_length += STRING_LENGTH_RECORD_LENGTH + value->size;
-            iterator->next();
-        }
-        StringVal result(ctx, total_serialized_set_length);
-        uint8_t* writer = result.ptr;
-        // type
-        *writer = _type;
-        writer++;
-        iterator = _set.begin();
-        while (iterator->has_next()) {
-            const StringRef* value = reinterpret_cast<const StringRef*>(iterator->get_value());
-            // length, it is unnecessary to consider little or big endian for
-            // all running in little-endian.
-            *(int*)writer = value->size;
-            writer += STRING_LENGTH_RECORD_LENGTH;
-            // value
-            memcpy(writer, value->data, value->size);
-            writer += value->size;
-            iterator->next();
-        }
-        return result;
-    }
-
-    void unserialize(StringVal& src) {
-        uint8_t* reader = src.ptr;
-        // skip type ,no used now
-        _type = (FunctionContext::Type)*reader;
-        DCHECK(_type == FunctionContext::TYPE_STRING);
-        reader++;
-        const uint8_t* end = src.ptr + src.len;
-        while (reader < end) {
-            const int length = *(int*)reader;
-            reader += STRING_LENGTH_RECORD_LENGTH;
-            StringRef value((char*)reader, length);
-            _set.insert(&value);
-            reader += length;
-        }
-        DCHECK(reader == end);
-    }
-
-    // merge set
-    void merge(MultiDistinctStringCountState& state) { _set.insert(&(state._set)); }
-
-    BigIntVal finalize() { return BigIntVal(_set.size()); }
-
-    FunctionContext::Type set_type() { return _type; }
-
-    static const int STRING_LENGTH_RECORD_LENGTH = 4;
-
-private:
-    StringSet _set;
-    // _type is serialized into buffer by one byte
-    FunctionContext::Type _type;
-};
-
-class MultiDistinctDecimalV2State {
-public:
-    static void create(StringVal* dst) {
-        dst->is_null = false;
-        const int state_size = sizeof(MultiDistinctDecimalV2State);
-        MultiDistinctDecimalV2State* state = new MultiDistinctDecimalV2State();
-        state->_type = FunctionContext::TYPE_DECIMALV2;
-        dst->len = state_size;
-        dst->ptr = (uint8_t*)state;
-    }
-
-    static void destroy(const StringVal& dst) { delete (MultiDistinctDecimalV2State*)dst.ptr; }
-
-    void update(DecimalV2Val& t) { _set.insert(DecimalV2Value::from_decimal_val(t)); }
-
-    // type:one byte  value:sizeof(T)
-    StringVal serialize(FunctionContext* ctx) {
-        const int serialized_set_length = sizeof(uint8_t) + DECIMAL_BYTE_SIZE * _set.size();
-        StringVal result(ctx, serialized_set_length);
-        uint8_t* writer = result.ptr;
-        *writer = (uint8_t)_type;
-        writer++;
-        // for int_length and frac_length, uint8_t will not overflow.
-        for (auto& value : _set) {
-            __int128 v = value.value();
-            memcpy(writer, &v, DECIMAL_BYTE_SIZE);
-            writer += DECIMAL_BYTE_SIZE;
-        }
-        return result;
-    }
-
-    void unserialize(StringVal& src) {
-        const uint8_t* reader = src.ptr;
-        // type
-        _type = (FunctionContext::Type)*reader;
-        reader++;
-        const uint8_t* end = src.ptr + src.len;
-        // value
-        while (reader < end) {
-            __int128 v = 0;
-            memcpy(&v, reader, DECIMAL_BYTE_SIZE);
-            DecimalV2Value value(v);
-            reader += DECIMAL_BYTE_SIZE;
-            _set.insert(value);
-        }
-    }
-
-    FunctionContext::Type set_type() { return _type; }
-
-    // merge set
-    void merge(MultiDistinctDecimalV2State& state) {
-        _set.insert(state._set.begin(), state._set.end());
-    }
-
-    // count
-    BigIntVal count_finalize() { return BigIntVal(_set.size()); }
-
-    DecimalV2Val sum_finalize() {
-        DecimalV2Value sum(0);
-        for (auto& value : _set) {
-            sum += value;
-        }
-        DecimalV2Val result;
-        sum.to_decimal_val(&result);
-        return result;
-    }
-
-private:
-    const int DECIMAL_BYTE_SIZE = 16;
-
-    phmap::flat_hash_set<DecimalV2Value> _set;
-
-    FunctionContext::Type _type;
-};
-
-// multi distinct state for date
-// serialize order type:packed_time:type:packed_time:type ...
-class MultiDistinctCountDateState {
-public:
-    static void create(StringVal* dst) {
-        dst->is_null = false;
-        const int state_size = sizeof(MultiDistinctCountDateState);
-        MultiDistinctCountDateState* state = new MultiDistinctCountDateState();
-        state->_type = FunctionContext::TYPE_DATETIME;
-        dst->len = state_size;
-        dst->ptr = (uint8_t*)state;
-    }
-
-    static void destroy(const StringVal& dst) { delete (MultiDistinctCountDateState*)dst.ptr; }
-
-    void update(DateTimeVal& t) { _set.insert(t); }
-
-    // type:one byte  value:sizeof(T)
-    StringVal serialize(FunctionContext* ctx) {
-        const int serialized_set_length =
-                sizeof(uint8_t) +
-                (DATETIME_PACKED_TIME_BYTE_SIZE + DATETIME_TYPE_BYTE_SIZE) * _set.size();
-        StringVal result(ctx, serialized_set_length);
-        uint8_t* writer = result.ptr;
-        // type
-        *writer = (uint8_t)_type;
-        writer++;
-        // value
-        for (auto& value : _set) {
-            int64_t* packed_time_writer = (int64_t*)writer;
-            *packed_time_writer = value.packed_time;
-            writer += DATETIME_PACKED_TIME_BYTE_SIZE;
-            int* type_writer = (int*)writer;
-            *type_writer = value.type;
-            writer += DATETIME_TYPE_BYTE_SIZE;
-        }
-        return result;
-    }
-
-    void unserialize(StringVal& src) {
-        const uint8_t* reader = src.ptr;
-        // type
-        _type = (FunctionContext::Type)*reader;
-        reader++;
-        const uint8_t* end = src.ptr + src.len;
-        // value
-        while (reader < end) {
-            DateTimeVal value;
-            value.is_null = false;
-            int64_t* packed_time_reader = (int64_t*)reader;
-            value.packed_time = *packed_time_reader;
-            reader += DATETIME_PACKED_TIME_BYTE_SIZE;
-            int* type_reader = (int*)reader;
-            value.type = *type_reader;
-            reader += DATETIME_TYPE_BYTE_SIZE;
-            _set.insert(value);
-        }
-    }
-
-    // merge set
-    void merge(MultiDistinctCountDateState& state) {
-        _set.insert(state._set.begin(), state._set.end());
-    }
-
-    // count
-    BigIntVal count_finalize() { return BigIntVal(_set.size()); }
-
-    FunctionContext::Type set_type() { return _type; }
-
-private:
-    class DateTimeHashHelper {
-    public:
-        size_t operator()(const DateTimeVal& obj) const {
-            size_t result = AnyValUtil::hash64_murmur(obj, HashUtil::MURMUR_SEED);
-            return result;
-        }
-    };
-
-    const int DATETIME_PACKED_TIME_BYTE_SIZE = 8;
-    const int DATETIME_TYPE_BYTE_SIZE = 4;
-
-    phmap::flat_hash_set<DateTimeVal, DateTimeHashHelper> _set;
-
-    FunctionContext::Type _type;
-};
-
-template <typename T>
-void AggregateFunctions::count_or_sum_distinct_numeric_init(FunctionContext* ctx, StringVal* dst) {
-    MultiDistinctNumericState<T>::create(dst);
-}
-
-void AggregateFunctions::count_distinct_string_init(FunctionContext* ctx, StringVal* dst) {
-    MultiDistinctStringCountState::create(dst);
-}
-
-void AggregateFunctions::count_or_sum_distinct_decimalv2_init(FunctionContext* ctx,
-                                                              StringVal* dst) {
-    MultiDistinctDecimalV2State::create(dst);
-}
-
-void AggregateFunctions::count_distinct_date_init(FunctionContext* ctx, StringVal* dst) {
-    MultiDistinctCountDateState::create(dst);
-}
-
-template <typename T>
-void AggregateFunctions::count_or_sum_distinct_numeric_update(FunctionContext* ctx, T& src,
-                                                              StringVal* dst) {
-    DCHECK(!dst->is_null);
-    if (src.is_null) return;
-    MultiDistinctNumericState<T>* state = reinterpret_cast<MultiDistinctNumericState<T>*>(dst->ptr);
-    state->update(src);
-}
-
-void AggregateFunctions::count_distinct_string_update(FunctionContext* ctx, StringVal& src,
-                                                      StringVal* dst) {
-    DCHECK(!dst->is_null);
-    if (src.is_null) return;
-    MultiDistinctStringCountState* state =
-            reinterpret_cast<MultiDistinctStringCountState*>(dst->ptr);
-    StringRef sv = StringRef(src);
-    state->update(&sv);
-}
-
-void AggregateFunctions::count_or_sum_distinct_decimalv2_update(FunctionContext* ctx,
-                                                                DecimalV2Val& src, StringVal* dst) {
-    DCHECK(!dst->is_null);
-    if (src.is_null) return;
-    MultiDistinctDecimalV2State* state = reinterpret_cast<MultiDistinctDecimalV2State*>(dst->ptr);
-    state->update(src);
-}
-
-void AggregateFunctions::count_distinct_date_update(FunctionContext* ctx, DateTimeVal& src,
-                                                    StringVal* dst) {
-    DCHECK(!dst->is_null);
-    if (src.is_null) return;
-    MultiDistinctCountDateState* state = reinterpret_cast<MultiDistinctCountDateState*>(dst->ptr);
-    state->update(src);
-}
-
-template <typename T>
-void AggregateFunctions::count_or_sum_distinct_numeric_merge(FunctionContext* ctx, StringVal& src,
-                                                             StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    MultiDistinctNumericState<T>* dst_state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(dst->ptr);
-    // unserialize src
-    StringVal src_state_val;
-    MultiDistinctNumericState<T>::create(&src_state_val);
-    MultiDistinctNumericState<T>* src_state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(src_state_val.ptr);
-    src_state->unserialize(src);
-    DCHECK(dst_state->set_type() == src_state->set_type());
-    dst_state->merge(*src_state);
-    MultiDistinctNumericState<T>::destroy(src_state_val);
-}
-
-void AggregateFunctions::count_distinct_string_merge(FunctionContext* ctx, StringVal& src,
-                                                     StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    MultiDistinctStringCountState* dst_state =
-            reinterpret_cast<MultiDistinctStringCountState*>(dst->ptr);
-    // unserialize src
-    StringVal src_state_val;
-    MultiDistinctStringCountState::create(&src_state_val);
-    MultiDistinctStringCountState* src_state =
-            reinterpret_cast<MultiDistinctStringCountState*>(src_state_val.ptr);
-    src_state->unserialize(src);
-    DCHECK(dst_state->set_type() == src_state->set_type());
-    dst_state->merge(*src_state);
-    MultiDistinctStringCountState::destroy(src_state_val);
-}
-
-void AggregateFunctions::count_or_sum_distinct_decimalv2_merge(FunctionContext* ctx, StringVal& src,
-                                                               StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    MultiDistinctDecimalV2State* dst_state =
-            reinterpret_cast<MultiDistinctDecimalV2State*>(dst->ptr);
-    // unserialize src
-    StringVal src_state_val;
-    MultiDistinctDecimalV2State::create(&src_state_val);
-    MultiDistinctDecimalV2State* src_state =
-            reinterpret_cast<MultiDistinctDecimalV2State*>(src_state_val.ptr);
-    src_state->unserialize(src);
-    DCHECK(dst_state->set_type() == src_state->set_type());
-    dst_state->merge(*src_state);
-    MultiDistinctDecimalV2State::destroy(src_state_val);
-}
-
-void AggregateFunctions::count_distinct_date_merge(FunctionContext* ctx, StringVal& src,
-                                                   StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK(!src.is_null);
-    MultiDistinctCountDateState* dst_state =
-            reinterpret_cast<MultiDistinctCountDateState*>(dst->ptr);
-    // unserialize src
-    StringVal src_state_val;
-    MultiDistinctCountDateState::create(&src_state_val);
-    MultiDistinctCountDateState* src_state =
-            reinterpret_cast<MultiDistinctCountDateState*>(src_state_val.ptr);
-    src_state->unserialize(src);
-    DCHECK(dst_state->set_type() == src_state->set_type());
-    dst_state->merge(*src_state);
-    MultiDistinctCountDateState::destroy(src_state_val);
-}
-
-template <typename T>
-StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize(FunctionContext* ctx,
-                                                                      const StringVal& state_sv) {
-    DCHECK(!state_sv.is_null);
-    MultiDistinctNumericState<T>* state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(state_sv.ptr);
-    StringVal result = state->serialize(ctx);
-    // release original object
-    MultiDistinctNumericState<T>::destroy(state_sv);
-    return result;
-}
-
-StringVal AggregateFunctions::count_distinct_string_serialize(FunctionContext* ctx,
-                                                              const StringVal& state_sv) {
-    DCHECK(!state_sv.is_null);
-    MultiDistinctStringCountState* state =
-            reinterpret_cast<MultiDistinctStringCountState*>(state_sv.ptr);
-    StringVal result = state->serialize(ctx);
-    // release original object
-    MultiDistinctStringCountState::destroy(state_sv);
-    return result;
-}
-
-StringVal AggregateFunctions::count_or_sum_distinct_decimalv2_serialize(FunctionContext* ctx,
-                                                                        const StringVal& state_sv) {
-    DCHECK(!state_sv.is_null);
-    MultiDistinctDecimalV2State* state =
-            reinterpret_cast<MultiDistinctDecimalV2State*>(state_sv.ptr);
-    StringVal result = state->serialize(ctx);
-    // release original object
-    MultiDistinctDecimalV2State::destroy(state_sv);
-    return result;
-}
-
-StringVal AggregateFunctions::count_distinct_date_serialize(FunctionContext* ctx,
-                                                            const StringVal& state_sv) {
-    DCHECK(!state_sv.is_null);
-    MultiDistinctCountDateState* state =
-            reinterpret_cast<MultiDistinctCountDateState*>(state_sv.ptr);
-    StringVal result = state->serialize(ctx);
-    // release original object
-    MultiDistinctCountDateState::destroy(state_sv);
-    return result;
-}
-
-template <typename T>
-BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize(FunctionContext* ctx,
-                                                                     const StringVal& state_sv) {
-    MultiDistinctNumericState<T>* state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(state_sv.ptr);
-    BigIntVal result = state->count_finalize();
-    MultiDistinctNumericState<T>::destroy(state_sv);
-    return result;
-}
-
-BigIntVal AggregateFunctions::count_distinct_string_finalize(FunctionContext* ctx,
-                                                             const StringVal& state_sv) {
-    MultiDistinctStringCountState* state =
-            reinterpret_cast<MultiDistinctStringCountState*>(state_sv.ptr);
-    BigIntVal result = state->finalize();
-    MultiDistinctStringCountState::destroy(state_sv);
-    return result;
-}
-
-template <typename T>
-DoubleVal AggregateFunctions::sum_distinct_double_finalize(FunctionContext* ctx,
-                                                           const StringVal& state_sv) {
-    MultiDistinctNumericState<T>* state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(state_sv.ptr);
-    DoubleVal result = state->sum_finalize_double();
-    MultiDistinctNumericState<T>::destroy(state_sv);
-    return result;
-}
-
-template <typename T>
-LargeIntVal AggregateFunctions::sum_distinct_largeint_finalize(FunctionContext* ctx,
-                                                               const StringVal& state_sv) {
-    MultiDistinctNumericState<T>* state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(state_sv.ptr);
-    LargeIntVal result = state->sum_finalize_largeint();
-    MultiDistinctNumericState<T>::destroy(state_sv);
-    return result;
-}
-
-template <typename T>
-BigIntVal AggregateFunctions::sum_distinct_bigint_finalize(FunctionContext* ctx,
-                                                           const StringVal& state_sv) {
-    MultiDistinctNumericState<T>* state =
-            reinterpret_cast<MultiDistinctNumericState<T>*>(state_sv.ptr);
-    BigIntVal result = state->sum_finalize_bigint();
-    MultiDistinctNumericState<T>::destroy(state_sv);
-    return result;
-}
-
-BigIntVal AggregateFunctions::count_distinct_decimalv2_finalize(FunctionContext* ctx,
-                                                                const StringVal& state_sv) {
-    MultiDistinctDecimalV2State* state =
-            reinterpret_cast<MultiDistinctDecimalV2State*>(state_sv.ptr);
-    BigIntVal result = state->count_finalize();
-    MultiDistinctDecimalV2State::destroy(state_sv);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::sum_distinct_decimalv2_finalize(FunctionContext* ctx,
-                                                                 const StringVal& state_sv) {
-    MultiDistinctDecimalV2State* state =
-            reinterpret_cast<MultiDistinctDecimalV2State*>(state_sv.ptr);
-    DecimalV2Val result = state->sum_finalize();
-    MultiDistinctDecimalV2State::destroy(state_sv);
-    return result;
-}
-
-BigIntVal AggregateFunctions::count_distinct_date_finalize(FunctionContext* ctx,
-                                                           const StringVal& state_sv) {
-    MultiDistinctCountDateState* state =
-            reinterpret_cast<MultiDistinctCountDateState*>(state_sv.ptr);
-    BigIntVal result = state->count_finalize();
-    MultiDistinctCountDateState::destroy(state_sv);
-    return result;
-}
-
-// An implementation of a simple single pass variance algorithm. A standard UDA must
-// be single pass (i.e. does not scan the table more than once), so the most canonical
-// two pass approach is not practical.
-struct KnuthVarianceState {
-    double mean;
-    double m2;
-    int64_t count;
-};
-
-// Use Decimal to store the intermediate results of the variance algorithm
-struct DecimalV2KnuthVarianceState {
-    DecimalV2Val mean;
-    DecimalV2Val m2;
-    int64_t count = 0;
-};
-
-// Set pop=true for population variance, false for sample variance
-static double compute_knuth_variance(const KnuthVarianceState& state, bool pop) {
-    // Return zero for 1 tuple specified by
-    // http://docs.oracle.com/cd/B19306_01/server.102/b14200/functions212.htm
-    if (state.count == 1) return 0.0;
-    if (pop) return state.m2 / state.count;
-    return state.m2 / (state.count - 1);
-}
-
-// The algorithm is the same as above, using decimal as the intermediate variable
-static DecimalV2Value decimalv2_compute_knuth_variance(const DecimalV2KnuthVarianceState& state,
-                                                       bool pop) {
-    DecimalV2Value new_count = DecimalV2Value();
-    if (state.count == 1) return new_count;
-    new_count.assign_from_double(state.count);
-    DecimalV2Value new_m2 = DecimalV2Value::from_decimal_val(state.m2);
-    if (pop)
-        return new_m2 / new_count;
-    else
-        return new_m2 / new_count.assign_from_double(state.count - 1);
-}
-
-void AggregateFunctions::knuth_var_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    // TODO(zc)
-    dst->len = sizeof(KnuthVarianceState);
-    dst->ptr = ctx->allocate(dst->len);
-    DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
-    memset(dst->ptr, 0, dst->len);
-}
-
-void AggregateFunctions::decimalv2_knuth_var_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(DecimalV2KnuthVarianceState);
-    // The memory for int128 need to be aligned by 16.
-    // So the constructor has been used instead of allocating memory.
-    // Also, it will be release in finalize.
-    dst->ptr = (uint8_t*)new DecimalV2KnuthVarianceState;
-}
-
-template <typename T>
-void AggregateFunctions::knuth_var_update(FunctionContext* ctx, const T& src, StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
-    if (src.is_null) return;
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
-    double temp = 1 + state->count;
-    double delta = src.val - state->mean;
-    double r = delta / temp;
-    state->mean += r;
-    state->m2 += state->count * delta * r;
-    state->count = temp;
-}
-
-template <typename T>
-void AggregateFunctions::knuth_var_remove(FunctionContext* context, const T& src, StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
-    double count = state->count - 1;
-    double mean = (state->mean * (count + 1) - src.val) / count;
-    double m2 = state->m2 - ((count * (src.val - mean) * (src.val - mean)) / (count + 1));
-    state->m2 = m2;
-    state->mean = mean;
-    state->count = count;
-}
-
-void AggregateFunctions::knuth_var_remove(FunctionContext* ctx, const DecimalV2Val& src,
-                                          StringVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-    DecimalV2KnuthVarianceState* state = reinterpret_cast<DecimalV2KnuthVarianceState*>(dst->ptr);
-
-    DecimalV2Value now_src = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value now_mean = DecimalV2Value::from_decimal_val(state->mean);
-    DecimalV2Value now_m2 = DecimalV2Value::from_decimal_val(state->m2);
-    DecimalV2Value now_count = DecimalV2Value();
-    now_count.assign_from_double(state->count);
-    DecimalV2Value now_count_minus = DecimalV2Value();
-    now_count_minus.assign_from_double(state->count - 1);
-
-    DecimalV2Value decimal_mean = (now_mean * now_count - now_src) / now_count_minus;
-    DecimalV2Value decimal_m2 =
-            now_m2 -
-            ((now_count_minus * (now_src - decimal_mean) * (now_src - decimal_mean)) / now_count);
-
-    decimal_m2.to_decimal_val(&state->m2);
-    decimal_mean.to_decimal_val(&state->mean);
-    --state->count;
-}
-
-void AggregateFunctions::knuth_var_update(FunctionContext* ctx, const DecimalV2Val& src,
-                                          StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, sizeof(DecimalV2KnuthVarianceState));
-    if (src.is_null) return;
-    DecimalV2KnuthVarianceState* state = reinterpret_cast<DecimalV2KnuthVarianceState*>(dst->ptr);
-
-    DecimalV2Value new_src = DecimalV2Value::from_decimal_val(src);
-    DecimalV2Value new_mean = DecimalV2Value::from_decimal_val(state->mean);
-    DecimalV2Value new_m2 = DecimalV2Value::from_decimal_val(state->m2);
-    DecimalV2Value new_count = DecimalV2Value();
-    new_count.assign_from_double(state->count);
-
-    DecimalV2Value temp = DecimalV2Value();
-    temp.assign_from_double(1 + state->count);
-    DecimalV2Value delta = new_src - new_mean;
-    DecimalV2Value r = delta / temp;
-    new_mean += r;
-    // This may cause Decimal to overflow. When it overflows, m2 will be equal to 9223372036854775807999999999,
-    // which is the maximum value that DecimalV2Value can represent. When using double to store the intermediate result m2,
-    // it can be expressed by scientific and technical methods and will not overflow.
-    // Spark's handling of decimal overflow is to return null or report an error, which can be controlled by parameters.
-    // Spark's handling of decimal reference: https://cloud.tencent.com/developer/news/483615
-    new_m2 += new_count * delta * r;
-    ++state->count;
-    new_mean.to_decimal_val(&state->mean);
-    new_m2.to_decimal_val(&state->m2);
-}
-
-void AggregateFunctions::knuth_var_merge(FunctionContext* ctx, const StringVal& src,
-                                         StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
-    DCHECK(!src.is_null);
-    DCHECK_EQ(src.len, sizeof(KnuthVarianceState));
-    // Reference implementation:
-    // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
-    KnuthVarianceState* src_state = reinterpret_cast<KnuthVarianceState*>(src.ptr);
-    KnuthVarianceState* dst_state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
-    if (src_state->count == 0) return;
-    double delta = dst_state->mean - src_state->mean;
-    double sum_count = dst_state->count + src_state->count;
-    dst_state->mean = src_state->mean + delta * (dst_state->count / sum_count);
-    dst_state->m2 = (src_state->m2) + dst_state->m2 +
-                    (delta * delta) * (src_state->count * dst_state->count / sum_count);
-    dst_state->count = sum_count;
-}
-
-void AggregateFunctions::decimalv2_knuth_var_merge(FunctionContext* ctx, const StringVal& src,
-                                                   StringVal* dst) {
-    DecimalV2KnuthVarianceState src_state;
-    memcpy(&src_state, src.ptr, sizeof(DecimalV2KnuthVarianceState));
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, sizeof(DecimalV2KnuthVarianceState));
-    DecimalV2KnuthVarianceState* dst_state =
-            reinterpret_cast<DecimalV2KnuthVarianceState*>(dst->ptr);
-    if (src_state.count == 0) return;
-
-    DecimalV2Value new_src_mean = DecimalV2Value::from_decimal_val(src_state.mean);
-    DecimalV2Value new_dst_mean = DecimalV2Value::from_decimal_val(dst_state->mean);
-    DecimalV2Value new_src_count = DecimalV2Value();
-    new_src_count.assign_from_double(src_state.count);
-    DecimalV2Value new_dst_count = DecimalV2Value();
-    new_dst_count.assign_from_double(dst_state->count);
-    DecimalV2Value new_src_m2 = DecimalV2Value::from_decimal_val(src_state.m2);
-    DecimalV2Value new_dst_m2 = DecimalV2Value::from_decimal_val(dst_state->m2);
-
-    DecimalV2Value delta = new_dst_mean - new_src_mean;
-    DecimalV2Value sum_count = new_dst_count + new_src_count;
-    new_dst_mean = new_src_mean + delta * (new_dst_count / sum_count);
-    new_dst_m2 = (new_src_m2) + new_dst_m2 +
-                 (delta * delta) * (new_src_count * new_dst_count / sum_count);
-    dst_state->count += src_state.count;
-    new_dst_mean.to_decimal_val(&dst_state->mean);
-    new_dst_m2.to_decimal_val(&dst_state->m2);
-}
-
-DoubleVal AggregateFunctions::knuth_var_get_value(FunctionContext* ctx, const StringVal& state_sv) {
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0 || state->count == 1) {
-        return DoubleVal::null();
-    }
-    double variance = compute_knuth_variance(*state, false);
-    return DoubleVal(variance);
-}
-
-DoubleVal AggregateFunctions::knuth_var_finalize(FunctionContext* ctx, const StringVal& state_sv) {
-    DoubleVal result = knuth_var_get_value(ctx, state_sv);
-    ctx->free(state_sv.ptr);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_var_get_value(FunctionContext* ctx,
-                                                               const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(DecimalV2KnuthVarianceState));
-    DecimalV2KnuthVarianceState* state =
-            reinterpret_cast<DecimalV2KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0 || state->count == 1) {
-        return DecimalV2Val::null();
-    }
-    DecimalV2Value variance = decimalv2_compute_knuth_variance(*state, false);
-    DecimalV2Val res;
-    variance.to_decimal_val(&res);
-    return res;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_var_finalize(FunctionContext* ctx,
-                                                              const StringVal& state_sv) {
-    DecimalV2Val result = decimalv2_knuth_var_get_value(ctx, state_sv);
-    delete (DecimalV2KnuthVarianceState*)state_sv.ptr;
-    return result;
-}
-
-DoubleVal AggregateFunctions::knuth_var_pop_get_value(FunctionContext* ctx,
-                                                      const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0) {
-        return DoubleVal::null();
-    }
-    double variance = compute_knuth_variance(*state, true);
-    return DoubleVal(variance);
-}
-
-DoubleVal AggregateFunctions::knuth_var_pop_finalize(FunctionContext* ctx,
-                                                     const StringVal& state_sv) {
-    DoubleVal result = knuth_var_pop_get_value(ctx, state_sv);
-    ctx->free(state_sv.ptr);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_var_pop_get_value(FunctionContext* ctx,
-                                                                   const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(DecimalV2KnuthVarianceState));
-    DecimalV2KnuthVarianceState* state =
-            reinterpret_cast<DecimalV2KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0) {
-        return DecimalV2Val::null();
-    }
-    DecimalV2Value variance = decimalv2_compute_knuth_variance(*state, true);
-    DecimalV2Val res;
-    variance.to_decimal_val(&res);
-    return res;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_var_pop_finalize(FunctionContext* ctx,
-                                                                  const StringVal& state_sv) {
-    DecimalV2Val result = decimalv2_knuth_var_pop_get_value(ctx, state_sv);
-    delete (DecimalV2KnuthVarianceState*)state_sv.ptr;
-    return result;
-}
-
-DoubleVal AggregateFunctions::knuth_stddev_get_value(FunctionContext* ctx,
-                                                     const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0 || state->count == 1) {
-        return DoubleVal::null();
-    }
-    double variance = sqrt(compute_knuth_variance(*state, false));
-    return DoubleVal(variance);
-}
-
-DoubleVal AggregateFunctions::knuth_stddev_finalize(FunctionContext* ctx,
-                                                    const StringVal& state_sv) {
-    DoubleVal result = knuth_stddev_get_value(ctx, state_sv);
-    ctx->free(state_sv.ptr);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_stddev_get_value(FunctionContext* ctx,
-                                                                  const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(DecimalV2KnuthVarianceState));
-    DecimalV2KnuthVarianceState* state =
-            reinterpret_cast<DecimalV2KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0 || state->count == 1) {
-        return DecimalV2Val::null();
-    }
-    DecimalV2Value variance = decimalv2_compute_knuth_variance(*state, false);
-    variance = DecimalV2Value::sqrt(variance);
-    DecimalV2Val res;
-    variance.to_decimal_val(&res);
-    return res;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_stddev_finalize(FunctionContext* ctx,
-                                                                 const StringVal& state_sv) {
-    DecimalV2Val result = decimalv2_knuth_stddev_get_value(ctx, state_sv);
-    delete (DecimalV2KnuthVarianceState*)state_sv.ptr;
-    return result;
-}
-
-DoubleVal AggregateFunctions::knuth_stddev_pop_get_value(FunctionContext* ctx,
-                                                         const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
-    KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0) {
-        return DoubleVal::null();
-    }
-    double variance = sqrt(compute_knuth_variance(*state, true));
-    return DoubleVal(variance);
-}
-
-DoubleVal AggregateFunctions::knuth_stddev_pop_finalize(FunctionContext* ctx,
-                                                        const StringVal& state_sv) {
-    DoubleVal result = knuth_stddev_pop_get_value(ctx, state_sv);
-    ctx->free(state_sv.ptr);
-    return result;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_stddev_pop_get_value(FunctionContext* ctx,
-                                                                      const StringVal& state_sv) {
-    DCHECK_EQ(state_sv.len, sizeof(DecimalV2KnuthVarianceState));
-    DecimalV2KnuthVarianceState* state =
-            reinterpret_cast<DecimalV2KnuthVarianceState*>(state_sv.ptr);
-    if (state->count == 0) {
-        return DecimalV2Val::null();
-    }
-    DecimalV2Value variance = decimalv2_compute_knuth_variance(*state, true);
-    variance = DecimalV2Value::sqrt(variance);
-    DecimalV2Val res;
-    variance.to_decimal_val(&res);
-    return res;
-}
-
-DecimalV2Val AggregateFunctions::decimalv2_knuth_stddev_pop_finalize(FunctionContext* ctx,
-                                                                     const StringVal& state_sv) {
-    DecimalV2Val result = decimalv2_knuth_stddev_pop_get_value(ctx, state_sv);
-    delete (DecimalV2KnuthVarianceState*)state_sv.ptr;
-    return result;
-}
-
-struct RankState {
-    int64_t rank;
-    int64_t count;
-    RankState() : rank(1), count(0) {}
-};
-
-void AggregateFunctions::rank_init(FunctionContext* ctx, StringVal* dst) {
-    int str_len = sizeof(RankState);
-    dst->is_null = false;
-    dst->ptr = ctx->allocate(str_len);
-    dst->len = str_len;
-    *reinterpret_cast<RankState*>(dst->ptr) = RankState();
-}
-
-void AggregateFunctions::rank_update(FunctionContext* ctx, StringVal* dst) {
-    DCHECK(!dst->is_null);
-    DCHECK_EQ(dst->len, sizeof(RankState));
-    RankState* state = reinterpret_cast<RankState*>(dst->ptr);
-    ++state->count;
-}
-
-void AggregateFunctions::dense_rank_update(FunctionContext* ctx, StringVal* dst) {}
-
-BigIntVal AggregateFunctions::rank_get_value(FunctionContext* ctx, StringVal& src_val) {
-    DCHECK(!src_val.is_null);
-    DCHECK_EQ(src_val.len, sizeof(RankState));
-    RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
-    DCHECK_GT(state->count, 0);
-    DCHECK_GT(state->rank, 0);
-    int64_t result = state->rank;
-
-    // Prepares future calls for the next rank
-    state->rank += state->count;
-    state->count = 0;
-    return BigIntVal(result);
-}
-
-BigIntVal AggregateFunctions::dense_rank_get_value(FunctionContext* ctx, StringVal& src_val) {
-    DCHECK(!src_val.is_null);
-    DCHECK_EQ(src_val.len, sizeof(RankState));
-    RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
-    DCHECK_EQ(state->count, 0);
-    DCHECK_GT(state->rank, 0);
-    int64_t result = state->rank;
-
-    // Prepares future calls for the next rank
-    ++state->rank;
-    return BigIntVal(result);
-}
-
-BigIntVal AggregateFunctions::rank_finalize(FunctionContext* ctx, StringVal& src_val) {
-    DCHECK(!src_val.is_null);
-    DCHECK_EQ(src_val.len, sizeof(RankState));
-    RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
-    int64_t result = state->rank;
-    ctx->free(src_val.ptr);
-    return BigIntVal(result);
-}
-
-template <typename T>
-void AggregateFunctions::last_val_update(FunctionContext* ctx, const T& src, T* dst) {
-    *dst = src;
-}
-
-template <>
-void AggregateFunctions::last_val_update(FunctionContext* ctx, const StringVal& src,
-                                         StringVal* dst) {
-    if (src.is_null) {
-        if (!dst->is_null) {
-            ctx->free(dst->ptr);
-        }
-        *dst = StringVal::null();
-        return;
-    }
-
-    if (dst->is_null) {
-        dst->ptr = ctx->allocate(src.len);
-        dst->is_null = false;
-    } else {
-        dst->ptr = ctx->reallocate(dst->ptr, src.len);
-    }
-    memcpy(dst->ptr, src.ptr, src.len);
-    dst->len = src.len;
-}
-
-template <typename T>
-void AggregateFunctions::last_val_remove(FunctionContext* ctx, const T& src, T* dst) {
-    if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
-        *dst = T::null();
-    }
-}
-
-template <>
-void AggregateFunctions::last_val_remove(FunctionContext* ctx, const StringVal& src,
-                                         StringVal* dst) {
-    if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
-        if (!dst->is_null) {
-            ctx->free(dst->ptr);
-        }
-        *dst = StringVal::null();
-    }
-}
-
-template <typename T>
-void AggregateFunctions::first_val_update(FunctionContext* ctx, const T& src, T* dst) {
-    // The first call to first_val_update sets the value of dst.
-    if (ctx->impl()->num_updates() > 1) {
-        return;
-    }
-    // num_updates is incremented before calling Update(), so it should never be 0.
-    // Remove() should never be called for FIRST_VALUE.
-    DCHECK_GT(ctx->impl()->num_updates(), 0);
-    DCHECK_EQ(ctx->impl()->num_removes(), 0);
-    *dst = src;
-}
-
-template <>
-void AggregateFunctions::first_val_update(FunctionContext* ctx, const StringVal& src,
-                                          StringVal* dst) {
-    if (ctx->impl()->num_updates() > 1) {
-        return;
-    }
-    DCHECK_GT(ctx->impl()->num_updates(), 0);
-    DCHECK_EQ(ctx->impl()->num_removes(), 0);
-    if (src.is_null) {
-        *dst = StringVal::null();
-        return;
-    }
-    *dst = StringVal(ctx->allocate(src.len), src.len);
-    memcpy(dst->ptr, src.ptr, src.len);
-}
-
-template <typename T>
-void AggregateFunctions::first_val_rewrite_update(FunctionContext* ctx, const T& src,
-                                                  const BigIntVal&, T* dst) {
-    last_val_update<T>(ctx, src, dst);
-}
-
-template <typename T>
-void AggregateFunctions::offset_fn_init(FunctionContext* ctx, T* dst) {
-    DCHECK_EQ(ctx->get_num_args(), 3);
-    DCHECK(ctx->is_arg_constant(1));
-    DCHECK(ctx->is_arg_constant(2));
-    DCHECK_EQ(ctx->get_arg_type(0)->type, ctx->get_arg_type(2)->type);
-    T src = *static_cast<T*>(ctx->get_constant_arg(2));
-    // The literal null is sometimes incorrectly converted to int, so *dst = src may cause SEGV
-    // if src length is larger than int, for example DatetimeVal
-    if (UNLIKELY(src.is_null)) {
-        dst->is_null = src.is_null;
-    } else {
-        *dst = src;
-    }
-}
-
-template <>
-void AggregateFunctions::offset_fn_init(FunctionContext* ctx, StringVal* dst) {
-    DCHECK_EQ(ctx->get_num_args(), 3);
-    DCHECK(ctx->is_arg_constant(1));
-    DCHECK(ctx->is_arg_constant(2));
-    DCHECK_EQ(ctx->get_arg_type(0)->type, ctx->get_arg_type(2)->type);
-    StringVal src = *static_cast<StringVal*>(ctx->get_constant_arg(2));
-    if (src.is_null) {
-        *dst = StringVal::null();
-    } else {
-        uint8_t* copy = ctx->allocate(src.len);
-        if (UNLIKELY(copy == nullptr)) {
-            *dst = StringVal::null();
-        } else {
-            *dst = StringVal(copy, src.len);
-            memcpy(dst->ptr, src.ptr, src.len);
-        }
-    }
-}
-
-template <typename T>
-void AggregateFunctions::offset_fn_update(FunctionContext* ctx, const T& src, const BigIntVal&,
-                                          const T& default_value, T* dst) {
-    *dst = src;
-}
-
-// Refer to AggregateFunctionWindowFunnel.h in https://github.com/ClickHouse/ClickHouse.git
-struct WindowFunnelState {
-    std::vector<std::pair<DateTimeValue, int>> events;
-    int max_event_level;
-    bool sorted;
-    int64_t window;
-
-    WindowFunnelState() {
-        sorted = true;
-        max_event_level = 0;
-        window = 0;
-    }
-
-    void add(DateTimeValue& timestamp, int event_idx, int event_num) {
-        max_event_level = event_num;
-        if (sorted && events.size() > 0) {
-            if (events.back().first == timestamp) {
-                sorted = events.back().second <= event_idx;
-            } else {
-                sorted = events.back().first < timestamp;
-            }
-        }
-        events.emplace_back(timestamp, event_idx);
-    }
-
-    void sort() {
-        if (sorted) {
-            return;
-        }
-        std::stable_sort(events.begin(), events.end());
-    }
-
-    int get_event_level() {
-        std::vector<std::optional<DateTimeValue>> events_timestamp(max_event_level);
-        for (int64_t i = 0; i < events.size(); i++) {
-            int& event_idx = events[i].second;
-            DateTimeValue& timestamp = events[i].first;
-            if (event_idx == 0) {
-                events_timestamp[0] = timestamp;
-                continue;
-            }
-            if (events_timestamp[event_idx - 1].has_value()) {
-                DateTimeValue& first_timestamp = events_timestamp[event_idx - 1].value();
-                DateTimeValue last_timestamp = first_timestamp;
-                TimeInterval interval(SECOND, window, false);
-                last_timestamp.date_add_interval(interval, SECOND);
-
-                if (timestamp <= last_timestamp) {
-                    events_timestamp[event_idx] = first_timestamp;
-                    if (event_idx + 1 == max_event_level) {
-                        // Usually, max event level is small.
-                        return max_event_level;
-                    }
-                }
-            }
-        }
-
-        for (int64_t i = events_timestamp.size() - 1; i >= 0; i--) {
-            if (events_timestamp[i].has_value()) {
-                return i + 1;
-            }
-        }
-
-        return 0;
-    }
-
-    void merge(WindowFunnelState* other) {
-        if (other->events.empty()) {
-            return;
-        }
-
-        int64_t orig_size = events.size();
-        events.insert(std::end(events), std::begin(other->events), std::end(other->events));
-        const auto begin = std::begin(events);
-        const auto middle = std::next(events.begin(), orig_size);
-        const auto end = std::end(events);
-        if (!other->sorted) {
-            std::stable_sort(middle, end);
-        }
-
-        if (!sorted) {
-            std::stable_sort(begin, middle);
-        }
-        std::inplace_merge(begin, middle, end);
-        max_event_level = max_event_level > 0 ? max_event_level : other->max_event_level;
-        window = window > 0 ? window : other->window;
-
-        sorted = true;
-    }
-
-    int64_t serialized_size() {
-        return sizeof(int) + sizeof(int64_t) + sizeof(uint64_t) +
-               events.size() * (sizeof(int64_t) + sizeof(int));
-    }
-
-    void serialize(uint8_t* buf) {
-        memcpy(buf, &max_event_level, sizeof(int));
-        buf += sizeof(int);
-        memcpy(buf, &window, sizeof(int64_t));
-        buf += sizeof(int64_t);
-
-        uint64_t event_num = events.size();
-        memcpy(buf, &event_num, sizeof(uint64_t));
-        buf += sizeof(uint64_t);
-        for (int64_t i = 0; i < events.size(); i++) {
-            int64_t timestamp = events[i].first;
-            int event_idx = events[i].second;
-            memcpy(buf, &timestamp, sizeof(int64_t));
-            buf += sizeof(int64_t);
-            memcpy(buf, &event_idx, sizeof(int));
-            buf += sizeof(int);
-        }
-    }
-
-    void deserialize(uint8_t* buf) {
-        uint64_t size;
-
-        memcpy(&max_event_level, buf, sizeof(int));
-        buf += sizeof(int);
-        memcpy(&window, buf, sizeof(int64_t));
-        buf += sizeof(int64_t);
-        memcpy(&size, buf, sizeof(uint64_t));
-        buf += sizeof(uint64_t);
-        for (int64_t i = 0; i < size; i++) {
-            int64_t timestamp;
-            int event_idx;
-
-            memcpy(&timestamp, buf, sizeof(int64_t));
-            buf += sizeof(int64_t);
-            memcpy(&event_idx, buf, sizeof(int));
-            buf += sizeof(int);
-            DateTimeValue time_value;
-            time_value.from_date_int64(timestamp);
-            add(time_value, event_idx, max_event_level);
-        }
-    }
-};
-
-void AggregateFunctions::window_funnel_init(FunctionContext* ctx, StringVal* dst) {
-    dst->is_null = false;
-    dst->len = sizeof(WindowFunnelState);
-    WindowFunnelState* state = new WindowFunnelState();
-    dst->ptr = (uint8_t*)state;
-    // constant args at index 0 and 1
-    if (ctx->is_arg_constant(0)) {
-        BigIntVal* window = reinterpret_cast<BigIntVal*>(ctx->get_constant_arg(0));
-        state->window = window->val;
-    }
-    // TODO handle mode in the future
-}
-
-void AggregateFunctions::window_funnel_update(FunctionContext* ctx, const BigIntVal& window,
-                                              const StringVal& mode, const DateTimeVal& timestamp,
-                                              int num_cond, const BooleanVal* conds,
-                                              StringVal* dst) {
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(WindowFunnelState), dst->len);
-
-    if (timestamp.is_null) {
-        return;
-    }
-
-    WindowFunnelState* state = reinterpret_cast<WindowFunnelState*>(dst->ptr);
-    for (int i = 0; i < num_cond; i++) {
-        if (conds[i].is_null) {
-            continue;
-        }
-        if (conds[i].val) {
-            DateTimeValue time_value = DateTimeValue::from_datetime_val(timestamp);
-            state->add(time_value, i, num_cond);
-        }
-    }
-}
-
-StringVal AggregateFunctions::window_funnel_serialize(FunctionContext* ctx, const StringVal& src) {
-    WindowFunnelState* state = reinterpret_cast<WindowFunnelState*>(src.ptr);
-    int64_t serialized_size = state->serialized_size();
-    StringVal result(ctx, sizeof(double) + serialized_size);
-    state->serialize(result.ptr);
-
-    delete state;
-    return result;
-}
-
-void AggregateFunctions::window_funnel_merge(FunctionContext* ctx, const StringVal& src,
-                                             StringVal* dst) {
-    DCHECK(dst->ptr != nullptr);
-    DCHECK_EQ(sizeof(WindowFunnelState), dst->len);
-    WindowFunnelState* dst_state = reinterpret_cast<WindowFunnelState*>(dst->ptr);
-
-    WindowFunnelState* src_state = new WindowFunnelState;
-
-    src_state->deserialize(src.ptr);
-    dst_state->merge(src_state);
-    delete src_state;
-}
-
-IntVal AggregateFunctions::window_funnel_finalize(FunctionContext* ctx, const StringVal& src) {
-    DCHECK(!src.is_null);
-
-    WindowFunnelState* state = reinterpret_cast<WindowFunnelState*>(src.ptr);
-    state->sort();
-    int val = state->get_event_level();
-    delete state;
-    return doris_udf::IntVal(val);
-}
-
-// Stamp out the templates for the types we need.
-template void AggregateFunctions::init_zero_null<BigIntVal>(FunctionContext*, BigIntVal* dst);
-template void AggregateFunctions::init_zero_null<LargeIntVal>(FunctionContext*, LargeIntVal* dst);
-template void AggregateFunctions::init_zero_null<DoubleVal>(FunctionContext*, DoubleVal* dst);
-
-// Stamp out the templates for the types we need.
-template void AggregateFunctions::init_zero<BigIntVal>(FunctionContext*, BigIntVal* dst);
-template void AggregateFunctions::init_zero<LargeIntVal>(FunctionContext*, LargeIntVal* dst);
-template void AggregateFunctions::init_zero<DoubleVal>(FunctionContext*, DoubleVal* dst);
-
-template void AggregateFunctions::init_zero_not_null<BigIntVal>(FunctionContext*, BigIntVal* dst);
-
-template void AggregateFunctions::sum_remove<BooleanVal, BigIntVal>(FunctionContext*,
-                                                                    const BooleanVal& src,
-                                                                    BigIntVal* dst);
-template void AggregateFunctions::sum_remove<TinyIntVal, BigIntVal>(FunctionContext*,
-                                                                    const TinyIntVal& src,
-                                                                    BigIntVal* dst);
-template void AggregateFunctions::sum_remove<SmallIntVal, BigIntVal>(FunctionContext*,
-                                                                     const SmallIntVal& src,
-                                                                     BigIntVal* dst);
-template void AggregateFunctions::sum_remove<IntVal, BigIntVal>(FunctionContext*, const IntVal& src,
-                                                                BigIntVal* dst);
-template void AggregateFunctions::sum_remove<BigIntVal, BigIntVal>(FunctionContext*,
-                                                                   const BigIntVal& src,
-                                                                   BigIntVal* dst);
-template void AggregateFunctions::sum_remove<FloatVal, DoubleVal>(FunctionContext*,
-                                                                  const FloatVal& src,
-                                                                  DoubleVal* dst);
-template void AggregateFunctions::sum_remove<DoubleVal, DoubleVal>(FunctionContext*,
-                                                                   const DoubleVal& src,
-                                                                   DoubleVal* dst);
-template void AggregateFunctions::sum_remove<LargeIntVal, LargeIntVal>(FunctionContext*,
-                                                                       const LargeIntVal& src,
-                                                                       LargeIntVal* dst);
-
-template void AggregateFunctions::avg_update<doris_udf::BooleanVal>(doris_udf::FunctionContext*,
-                                                                    doris_udf::BooleanVal const&,
-                                                                    doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::IntVal>(doris_udf::FunctionContext*,
-                                                                doris_udf::IntVal const&,
-                                                                doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::IntVal>(doris_udf::FunctionContext*,
-                                                                doris_udf::IntVal const&,
-                                                                doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::BigIntVal>(doris_udf::FunctionContext*,
-                                                                   doris_udf::BigIntVal const&,
-                                                                   doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::BigIntVal>(doris_udf::FunctionContext*,
-                                                                   doris_udf::BigIntVal const&,
-                                                                   doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::FloatVal>(doris_udf::FunctionContext*,
-                                                                  doris_udf::FloatVal const&,
-                                                                  doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::FloatVal>(doris_udf::FunctionContext*,
-                                                                  doris_udf::FloatVal const&,
-                                                                  doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::DoubleVal>(doris_udf::FunctionContext*,
-                                                                   doris_udf::DoubleVal const&,
-                                                                   doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::DoubleVal>(doris_udf::FunctionContext*,
-                                                                   doris_udf::DoubleVal const&,
-                                                                   doris_udf::StringVal*);
-//template void AggregateFunctions::AvgUpdate<doris_udf::LargeIntVal>(
-//doris_udf::FunctionContext*, doris_udf::LargeIntVal const&, doris_udf::StringVal*);
-//template void AggregateFunctions::AvgRemove<doris_udf::LargeIntVal>(
-//doris_udf::FunctionContext*, doris_udf::LargeIntVal const&, doris_udf::StringVal*);
-
-template void AggregateFunctions::sum<BooleanVal, BigIntVal>(FunctionContext*,
-                                                             const BooleanVal& src, BigIntVal* dst);
-template void AggregateFunctions::sum<TinyIntVal, BigIntVal>(FunctionContext*,
-                                                             const TinyIntVal& src, BigIntVal* dst);
-template void AggregateFunctions::sum<SmallIntVal, BigIntVal>(FunctionContext*,
-                                                              const SmallIntVal& src,
-                                                              BigIntVal* dst);
-template void AggregateFunctions::sum<IntVal, BigIntVal>(FunctionContext*, const IntVal& src,
-                                                         BigIntVal* dst);
-template void AggregateFunctions::sum<BigIntVal, BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                            BigIntVal* dst);
-template void AggregateFunctions::sum<LargeIntVal, LargeIntVal>(FunctionContext*,
-                                                                const LargeIntVal& src,
-                                                                LargeIntVal* dst);
-template void AggregateFunctions::sum<FloatVal, DoubleVal>(FunctionContext*, const FloatVal& src,
-                                                           DoubleVal* dst);
-template void AggregateFunctions::sum<DoubleVal, DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                            DoubleVal* dst);
-
-template void AggregateFunctions::min_init<BooleanVal>(doris_udf::FunctionContext*,
-                                                       BooleanVal* dst);
-template void AggregateFunctions::min_init<TinyIntVal>(doris_udf::FunctionContext*,
-                                                       TinyIntVal* dst);
-template void AggregateFunctions::min_init<SmallIntVal>(doris_udf::FunctionContext*,
-                                                        SmallIntVal* dst);
-template void AggregateFunctions::min_init<IntVal>(doris_udf::FunctionContext*, IntVal* dst);
-template void AggregateFunctions::min_init<BigIntVal>(doris_udf::FunctionContext*, BigIntVal* dst);
-template void AggregateFunctions::min_init<LargeIntVal>(doris_udf::FunctionContext*,
-                                                        LargeIntVal* dst);
-template void AggregateFunctions::min_init<FloatVal>(doris_udf::FunctionContext*, FloatVal* dst);
-template void AggregateFunctions::min_init<DoubleVal>(doris_udf::FunctionContext*, DoubleVal* dst);
-template void AggregateFunctions::min_init<DateTimeVal>(doris_udf::FunctionContext*,
-                                                        DateTimeVal* dst);
-template void AggregateFunctions::min_init<DecimalV2Val>(doris_udf::FunctionContext*,
-                                                         DecimalV2Val* dst);
-template void AggregateFunctions::min_init<StringVal>(doris_udf::FunctionContext*, StringVal* dst);
-
-template void AggregateFunctions::min<BooleanVal>(FunctionContext*, const BooleanVal& src,
-                                                  BooleanVal* dst);
-template void AggregateFunctions::min<TinyIntVal>(FunctionContext*, const TinyIntVal& src,
-                                                  TinyIntVal* dst);
-template void AggregateFunctions::min<SmallIntVal>(FunctionContext*, const SmallIntVal& src,
-                                                   SmallIntVal* dst);
-template void AggregateFunctions::min<IntVal>(FunctionContext*, const IntVal& src, IntVal* dst);
-template void AggregateFunctions::min<BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                 BigIntVal* dst);
-template void AggregateFunctions::min<LargeIntVal>(FunctionContext*, const LargeIntVal& src,
-                                                   LargeIntVal* dst);
-template void AggregateFunctions::min<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                FloatVal* dst);
-template void AggregateFunctions::min<DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                 DoubleVal* dst);
-
-template void AggregateFunctions::avg_remove<doris_udf::BooleanVal>(doris_udf::FunctionContext*,
-                                                                    doris_udf::BooleanVal const&,
-                                                                    doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::TinyIntVal>(doris_udf::FunctionContext*,
-                                                                    doris_udf::TinyIntVal const&,
-                                                                    doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::TinyIntVal>(doris_udf::FunctionContext*,
-                                                                    doris_udf::TinyIntVal const&,
-                                                                    doris_udf::StringVal*);
-template void AggregateFunctions::avg_update<doris_udf::SmallIntVal>(doris_udf::FunctionContext*,
-                                                                     doris_udf::SmallIntVal const&,
-                                                                     doris_udf::StringVal*);
-template void AggregateFunctions::avg_remove<doris_udf::SmallIntVal>(doris_udf::FunctionContext*,
-                                                                     doris_udf::SmallIntVal const&,
-                                                                     doris_udf::StringVal*);
-
-template void AggregateFunctions::max_init<BooleanVal>(doris_udf::FunctionContext*,
-                                                       BooleanVal* dst);
-template void AggregateFunctions::max_init<TinyIntVal>(doris_udf::FunctionContext*,
-                                                       TinyIntVal* dst);
-template void AggregateFunctions::max_init<SmallIntVal>(doris_udf::FunctionContext*,
-                                                        SmallIntVal* dst);
-template void AggregateFunctions::max_init<IntVal>(doris_udf::FunctionContext*, IntVal* dst);
-template void AggregateFunctions::max_init<BigIntVal>(doris_udf::FunctionContext*, BigIntVal* dst);
-template void AggregateFunctions::max_init<LargeIntVal>(doris_udf::FunctionContext*,
-                                                        LargeIntVal* dst);
-template void AggregateFunctions::max_init<FloatVal>(doris_udf::FunctionContext*, FloatVal* dst);
-template void AggregateFunctions::max_init<DoubleVal>(doris_udf::FunctionContext*, DoubleVal* dst);
-template void AggregateFunctions::max_init<DateTimeVal>(doris_udf::FunctionContext*,
-                                                        DateTimeVal* dst);
-template void AggregateFunctions::max_init<DecimalV2Val>(doris_udf::FunctionContext*,
-                                                         DecimalV2Val* dst);
-template void AggregateFunctions::max_init<StringVal>(doris_udf::FunctionContext*, StringVal* dst);
-
-template void AggregateFunctions::max<BooleanVal>(FunctionContext*, const BooleanVal& src,
-                                                  BooleanVal* dst);
-template void AggregateFunctions::max<TinyIntVal>(FunctionContext*, const TinyIntVal& src,
-                                                  TinyIntVal* dst);
-template void AggregateFunctions::max<SmallIntVal>(FunctionContext*, const SmallIntVal& src,
-                                                   SmallIntVal* dst);
-template void AggregateFunctions::max<IntVal>(FunctionContext*, const IntVal& src, IntVal* dst);
-template void AggregateFunctions::max<BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                 BigIntVal* dst);
-template void AggregateFunctions::max<LargeIntVal>(FunctionContext*, const LargeIntVal& src,
-                                                   LargeIntVal* dst);
-template void AggregateFunctions::max<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                FloatVal* dst);
-template void AggregateFunctions::max<DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                 DoubleVal* dst);
-
-template void AggregateFunctions::any_init<BooleanVal>(doris_udf::FunctionContext*,
-                                                       BooleanVal* dst);
-template void AggregateFunctions::any_init<TinyIntVal>(doris_udf::FunctionContext*,
-                                                       TinyIntVal* dst);
-template void AggregateFunctions::any_init<SmallIntVal>(doris_udf::FunctionContext*,
-                                                        SmallIntVal* dst);
-template void AggregateFunctions::any_init<IntVal>(doris_udf::FunctionContext*, IntVal* dst);
-template void AggregateFunctions::any_init<BigIntVal>(doris_udf::FunctionContext*, BigIntVal* dst);
-template void AggregateFunctions::any_init<LargeIntVal>(doris_udf::FunctionContext*,
-                                                        LargeIntVal* dst);
-template void AggregateFunctions::any_init<FloatVal>(doris_udf::FunctionContext*, FloatVal* dst);
-template void AggregateFunctions::any_init<DoubleVal>(doris_udf::FunctionContext*, DoubleVal* dst);
-template void AggregateFunctions::any_init<DateTimeVal>(doris_udf::FunctionContext*,
-                                                        DateTimeVal* dst);
-template void AggregateFunctions::any_init<DecimalV2Val>(doris_udf::FunctionContext*,
-                                                         DecimalV2Val* dst);
-template void AggregateFunctions::any_init<StringVal>(doris_udf::FunctionContext*, StringVal* dst);
-
-template void AggregateFunctions::any<BooleanVal>(FunctionContext*, const BooleanVal& src,
-                                                  BooleanVal* dst);
-template void AggregateFunctions::any<TinyIntVal>(FunctionContext*, const TinyIntVal& src,
-                                                  TinyIntVal* dst);
-template void AggregateFunctions::any<SmallIntVal>(FunctionContext*, const SmallIntVal& src,
-                                                   SmallIntVal* dst);
-template void AggregateFunctions::any<IntVal>(FunctionContext*, const IntVal& src, IntVal* dst);
-template void AggregateFunctions::any<BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                 BigIntVal* dst);
-template void AggregateFunctions::any<LargeIntVal>(FunctionContext*, const LargeIntVal& src,
-                                                   LargeIntVal* dst);
-template void AggregateFunctions::any<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                FloatVal* dst);
-template void AggregateFunctions::any<DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                 DoubleVal* dst);
-
-template void AggregateFunctions::pc_update(FunctionContext*, const BooleanVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const TinyIntVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const SmallIntVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const IntVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const BigIntVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const FloatVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const DoubleVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const StringVal&, StringVal*);
-template void AggregateFunctions::pc_update(FunctionContext*, const DateTimeVal&, StringVal*);
-
-template void AggregateFunctions::pcsa_update(FunctionContext*, const BooleanVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const TinyIntVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const SmallIntVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const IntVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const BigIntVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const FloatVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const DoubleVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const StringVal&, StringVal*);
-template void AggregateFunctions::pcsa_update(FunctionContext*, const DateTimeVal&, StringVal*);
-
-template void AggregateFunctions::hll_update(FunctionContext*, const BooleanVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const TinyIntVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const SmallIntVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const IntVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const BigIntVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const FloatVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const DoubleVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const StringVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const DateTimeVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const LargeIntVal&, StringVal*);
-template void AggregateFunctions::hll_update(FunctionContext*, const DecimalV2Val&, StringVal*);
-
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<TinyIntVal>(
-        FunctionContext* ctx, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<SmallIntVal>(
-        FunctionContext* ctx, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<IntVal>(FunctionContext* ctx,
-                                                                             StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<BigIntVal>(
-        FunctionContext* ctx, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<FloatVal>(FunctionContext* ctx,
-                                                                               StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<DoubleVal>(
-        FunctionContext* ctx, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_init<LargeIntVal>(
-        FunctionContext* ctx, StringVal* dst);
-
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<TinyIntVal>(
-        FunctionContext* ctx, TinyIntVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<SmallIntVal>(
-        FunctionContext* ctx, SmallIntVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<IntVal>(FunctionContext* ctx,
-                                                                               IntVal& src,
-                                                                               StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<BigIntVal>(
-        FunctionContext* ctx, BigIntVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<FloatVal>(
-        FunctionContext* ctx, FloatVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<DoubleVal>(
-        FunctionContext* ctx, DoubleVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_update<LargeIntVal>(
-        FunctionContext* ctx, LargeIntVal& src, StringVal* dst);
-
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<TinyIntVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<SmallIntVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<IntVal>(FunctionContext* ctx,
-                                                                              StringVal& src,
-                                                                              StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<BigIntVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<FloatVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<DoubleVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-template void AggregateFunctions::count_or_sum_distinct_numeric_merge<LargeIntVal>(
-        FunctionContext* ctx, StringVal& src, StringVal* dst);
-
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<TinyIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<SmallIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<IntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<BigIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<FloatVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<DoubleVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template StringVal AggregateFunctions::count_or_sum_distinct_numeric_serialize<LargeIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<TinyIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<SmallIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<IntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<BigIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<FloatVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<DoubleVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::count_or_sum_distinct_numeric_finalize<LargeIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-
-template BigIntVal AggregateFunctions::sum_distinct_bigint_finalize<TinyIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::sum_distinct_bigint_finalize<SmallIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::sum_distinct_bigint_finalize<IntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-template BigIntVal AggregateFunctions::sum_distinct_bigint_finalize<BigIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-
-template DoubleVal AggregateFunctions::sum_distinct_double_finalize<DoubleVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-
-template LargeIntVal AggregateFunctions::sum_distinct_largeint_finalize<LargeIntVal>(
-        FunctionContext* ctx, const StringVal& state_sv);
-
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const TinyIntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const SmallIntVal&,
-                                                   StringVal*);
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const IntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const BigIntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const FloatVal&, StringVal*);
-template void AggregateFunctions::knuth_var_update(FunctionContext*, const DoubleVal&, StringVal*);
-
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const TinyIntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const SmallIntVal&,
-                                                   StringVal*);
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const IntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const BigIntVal&, StringVal*);
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const FloatVal&, StringVal*);
-template void AggregateFunctions::knuth_var_remove(FunctionContext*, const DoubleVal&, StringVal*);
-
-template void AggregateFunctions::first_val_update<BooleanVal>(FunctionContext*,
-                                                               const BooleanVal& src,
-                                                               BooleanVal* dst);
-template void AggregateFunctions::first_val_update<TinyIntVal>(FunctionContext*,
-                                                               const TinyIntVal& src,
-                                                               TinyIntVal* dst);
-template void AggregateFunctions::first_val_update<SmallIntVal>(FunctionContext*,
-                                                                const SmallIntVal& src,
-                                                                SmallIntVal* dst);
-template void AggregateFunctions::first_val_update<IntVal>(FunctionContext*, const IntVal& src,
-                                                           IntVal* dst);
-template void AggregateFunctions::first_val_update<BigIntVal>(FunctionContext*,
-                                                              const BigIntVal& src, BigIntVal* dst);
-template void AggregateFunctions::first_val_update<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                             FloatVal* dst);
-template void AggregateFunctions::first_val_update<DoubleVal>(FunctionContext*,
-                                                              const DoubleVal& src, DoubleVal* dst);
-
-template void AggregateFunctions::first_val_update<DateTimeVal>(FunctionContext*,
-                                                                const DateTimeVal& src,
-                                                                DateTimeVal* dst);
-
-template void AggregateFunctions::first_val_rewrite_update<BooleanVal>(FunctionContext*,
-                                                                       const BooleanVal& src,
-                                                                       const BigIntVal&,
-                                                                       BooleanVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<TinyIntVal>(FunctionContext*,
-                                                                       const TinyIntVal& src,
-                                                                       const BigIntVal&,
-                                                                       TinyIntVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<SmallIntVal>(FunctionContext*,
-                                                                        const SmallIntVal& src,
-                                                                        const BigIntVal&,
-                                                                        SmallIntVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<IntVal>(FunctionContext*,
-                                                                   const IntVal& src,
-                                                                   const BigIntVal&, IntVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<BigIntVal>(FunctionContext*,
-                                                                      const BigIntVal& src,
-                                                                      const BigIntVal&,
-                                                                      BigIntVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<FloatVal>(FunctionContext*,
-                                                                     const FloatVal& src,
-                                                                     const BigIntVal&,
-                                                                     FloatVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<DoubleVal>(FunctionContext*,
-                                                                      const DoubleVal& src,
-                                                                      const BigIntVal&,
-                                                                      DoubleVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<StringVal>(FunctionContext*,
-                                                                      const StringVal& src,
-                                                                      const BigIntVal&,
-                                                                      StringVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<DateTimeVal>(FunctionContext*,
-                                                                        const DateTimeVal& src,
-                                                                        const BigIntVal&,
-                                                                        DateTimeVal* dst);
-template void AggregateFunctions::first_val_rewrite_update<DecimalV2Val>(FunctionContext*,
-                                                                         const DecimalV2Val& src,
-                                                                         const BigIntVal&,
-                                                                         DecimalV2Val* dst);
-
-template void AggregateFunctions::first_val_update<doris_udf::DecimalV2Val>(
-        doris_udf::FunctionContext*, doris_udf::DecimalV2Val const&, doris_udf::DecimalV2Val*);
-
-template void AggregateFunctions::last_val_update<BooleanVal>(FunctionContext*,
-                                                              const BooleanVal& src,
-                                                              BooleanVal* dst);
-template void AggregateFunctions::last_val_update<TinyIntVal>(FunctionContext*,
-                                                              const TinyIntVal& src,
-                                                              TinyIntVal* dst);
-template void AggregateFunctions::last_val_update<SmallIntVal>(FunctionContext*,
-                                                               const SmallIntVal& src,
-                                                               SmallIntVal* dst);
-template void AggregateFunctions::last_val_update<IntVal>(FunctionContext*, const IntVal& src,
-                                                          IntVal* dst);
-template void AggregateFunctions::last_val_update<BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                             BigIntVal* dst);
-template void AggregateFunctions::last_val_update<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                            FloatVal* dst);
-template void AggregateFunctions::last_val_update<DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                             DoubleVal* dst);
-template void AggregateFunctions::last_val_update<DateTimeVal>(FunctionContext*,
-                                                               const DateTimeVal& src,
-                                                               DateTimeVal* dst);
-template void AggregateFunctions::last_val_update<DecimalV2Val>(FunctionContext*,
-                                                                const DecimalV2Val& src,
-                                                                DecimalV2Val* dst);
-
-template void AggregateFunctions::last_val_remove<BooleanVal>(FunctionContext*,
-                                                              const BooleanVal& src,
-                                                              BooleanVal* dst);
-template void AggregateFunctions::last_val_remove<TinyIntVal>(FunctionContext*,
-                                                              const TinyIntVal& src,
-                                                              TinyIntVal* dst);
-template void AggregateFunctions::last_val_remove<SmallIntVal>(FunctionContext*,
-                                                               const SmallIntVal& src,
-                                                               SmallIntVal* dst);
-template void AggregateFunctions::last_val_remove<IntVal>(FunctionContext*, const IntVal& src,
-                                                          IntVal* dst);
-template void AggregateFunctions::last_val_remove<BigIntVal>(FunctionContext*, const BigIntVal& src,
-                                                             BigIntVal* dst);
-template void AggregateFunctions::last_val_remove<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                            FloatVal* dst);
-template void AggregateFunctions::last_val_remove<DoubleVal>(FunctionContext*, const DoubleVal& src,
-                                                             DoubleVal* dst);
-template void AggregateFunctions::last_val_remove<DateTimeVal>(FunctionContext*,
-                                                               const DateTimeVal& src,
-                                                               DateTimeVal* dst);
-template void AggregateFunctions::last_val_remove<DecimalV2Val>(FunctionContext*,
-                                                                const DecimalV2Val& src,
-                                                                DecimalV2Val* dst);
-
-template void AggregateFunctions::offset_fn_init<BooleanVal>(FunctionContext*, BooleanVal*);
-template void AggregateFunctions::offset_fn_init<TinyIntVal>(FunctionContext*, TinyIntVal*);
-template void AggregateFunctions::offset_fn_init<SmallIntVal>(FunctionContext*, SmallIntVal*);
-template void AggregateFunctions::offset_fn_init<IntVal>(FunctionContext*, IntVal*);
-template void AggregateFunctions::offset_fn_init<BigIntVal>(FunctionContext*, BigIntVal*);
-template void AggregateFunctions::offset_fn_init<FloatVal>(FunctionContext*, FloatVal*);
-template void AggregateFunctions::offset_fn_init<DoubleVal>(FunctionContext*, DoubleVal*);
-template void AggregateFunctions::offset_fn_init<DateTimeVal>(FunctionContext*, DateTimeVal*);
-template void AggregateFunctions::offset_fn_init<DecimalV2Val>(FunctionContext*, DecimalV2Val*);
-
-template void AggregateFunctions::offset_fn_update<BooleanVal>(FunctionContext*,
-                                                               const BooleanVal& src,
-                                                               const BigIntVal&, const BooleanVal&,
-                                                               BooleanVal* dst);
-template void AggregateFunctions::offset_fn_update<TinyIntVal>(FunctionContext*,
-                                                               const TinyIntVal& src,
-                                                               const BigIntVal&, const TinyIntVal&,
-                                                               TinyIntVal* dst);
-template void AggregateFunctions::offset_fn_update<SmallIntVal>(FunctionContext*,
-                                                                const SmallIntVal& src,
-                                                                const BigIntVal&,
-                                                                const SmallIntVal&,
-                                                                SmallIntVal* dst);
-template void AggregateFunctions::offset_fn_update<IntVal>(FunctionContext*, const IntVal& src,
-                                                           const BigIntVal&, const IntVal&,
-                                                           IntVal* dst);
-template void AggregateFunctions::offset_fn_update<BigIntVal>(FunctionContext*,
-                                                              const BigIntVal& src,
-                                                              const BigIntVal&, const BigIntVal&,
-                                                              BigIntVal* dst);
-template void AggregateFunctions::offset_fn_update<FloatVal>(FunctionContext*, const FloatVal& src,
-                                                             const BigIntVal&, const FloatVal&,
-                                                             FloatVal* dst);
-template void AggregateFunctions::offset_fn_update<DoubleVal>(FunctionContext*,
-                                                              const DoubleVal& src,
-                                                              const BigIntVal&, const DoubleVal&,
-                                                              DoubleVal* dst);
-template void AggregateFunctions::offset_fn_update<StringVal>(FunctionContext*,
-                                                              const StringVal& src,
-                                                              const BigIntVal&, const StringVal&,
-                                                              StringVal* dst);
-template void AggregateFunctions::offset_fn_update<DateTimeVal>(FunctionContext*,
-                                                                const DateTimeVal& src,
-                                                                const BigIntVal&,
-                                                                const DateTimeVal&,
-                                                                DateTimeVal* dst);
-template void AggregateFunctions::offset_fn_update<DecimalV2Val>(FunctionContext*,
-                                                                 const DecimalV2Val& src,
-                                                                 const BigIntVal&,
-                                                                 const DecimalV2Val&,
-                                                                 DecimalV2Val* dst);
-
-template void AggregateFunctions::percentile_update<BigIntVal>(FunctionContext* ctx,
-                                                               const BigIntVal&, const DoubleVal&,
-                                                               StringVal*);
-
-template void AggregateFunctions::percentile_approx_update<doris_udf::DoubleVal>(
-        FunctionContext* ctx, const doris_udf::DoubleVal&, const doris_udf::DoubleVal&,
-        doris_udf::StringVal*);
-
-template void AggregateFunctions::percentile_approx_update<doris_udf::DoubleVal>(
-        FunctionContext* ctx, const doris_udf::DoubleVal&, const doris_udf::DoubleVal&,
-        const doris_udf::DoubleVal&, doris_udf::StringVal*);
-
-} // namespace doris
diff --git a/be/src/exprs/aggregate_functions.h b/be/src/exprs/aggregate_functions.h
deleted file mode 100644
index 22407109d0..0000000000
--- a/be/src/exprs/aggregate_functions.h
+++ /dev/null
@@ -1,422 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exprs/aggregate-functions.h
-// and modified by Doris
-
-#pragma once
-
-#include "udf/udf.h"
-
-namespace doris {
-
-class HllSetResolver;
-class HybridSetBase;
-
-// Collection of builtin aggregate functions. Aggregate functions implement
-// the various phases of the aggregation: Init(), Update(), Serialize(), Merge(),
-// and Finalize(). Not all functions need to implement all of the steps and
-// some of the parts can be reused across different aggregate functions.
-// This functions are implemented using the UDA interface.
-
-class AggregateFunctions {
-public:
-    // Initializes dst to nullptr.
-    static void init_null(doris_udf::FunctionContext*, doris_udf::AnyVal* dst);
-    // Initializes dst to nullptr and sets dst->ptr to nullptr.
-    static void init_null_string(doris_udf::FunctionContext* c, doris_udf::StringVal* dst);
-
-    // Initializes dst to 0 and is_null = true.
-    template <typename T>
-    static void init_zero(doris_udf::FunctionContext*, T* dst);
-
-    // Initializes dst to 0 and is_null = true.
-    template <typename T>
-    static void init_zero_null(doris_udf::FunctionContext*, T* dst);
-
-    // Initializes dst to 0.
-    template <typename T>
-    static void init_zero_not_null(doris_udf::FunctionContext*, T* dst);
-
-    template <typename SRC_VAL, typename DST_VAL>
-    static void sum_remove(doris_udf::FunctionContext* ctx, const SRC_VAL& src, DST_VAL* dst);
-
-    // doris_udf::StringVal GetValue() function that returns a copy of src
-    static doris_udf::StringVal string_val_get_value(doris_udf::FunctionContext* ctx,
-                                                     const doris_udf::StringVal& src);
-    static doris_udf::StringVal string_val_serialize_or_finalize(doris_udf::FunctionContext* ctx,
-                                                                 const doris_udf::StringVal& src);
-
-    // Implementation of Count and Count(*)
-    static void count_update(doris_udf::FunctionContext*, const doris_udf::AnyVal& src,
-                             doris_udf::BigIntVal* dst);
-    static void count_merge(doris_udf::FunctionContext*, const doris_udf::BigIntVal& src,
-                            doris_udf::BigIntVal* dst);
-    static void count_remove(doris_udf::FunctionContext*, const doris_udf::AnyVal& src,
-                             doris_udf::BigIntVal* dst);
-    static void count_star_update(doris_udf::FunctionContext*, doris_udf::BigIntVal* dst);
-
-    static void count_star_remove(FunctionContext*, BigIntVal* dst);
-
-    // Implementation of percentile
-    static void percentile_init(FunctionContext* ctx, StringVal* dst);
-
-    template <typename T>
-    static void percentile_update(FunctionContext* ctx, const T& src, const DoubleVal& quantile,
-                                  StringVal* dst);
-
-    static void percentile_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
-
-    static StringVal percentile_serialize(FunctionContext* ctx, const StringVal& state_sv);
-
-    static DoubleVal percentile_finalize(FunctionContext* ctx, const StringVal& src);
-
-    // Implementation of percentile_approx
-    static void percentile_approx_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst);
-
-    template <typename T>
-    static void percentile_approx_update(FunctionContext* ctx, const T& src,
-                                         const DoubleVal& quantile, StringVal* dst);
-
-    template <typename T>
-    static void percentile_approx_update(FunctionContext* ctx, const T& src,
-                                         const DoubleVal& quantile,
-                                         const DoubleVal& digest_compression, StringVal* dst);
-
-    static void percentile_approx_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
-
-    static DoubleVal percentile_approx_finalize(FunctionContext* ctx, const StringVal& src);
-
-    static StringVal percentile_approx_serialize(FunctionContext* ctx, const StringVal& state_sv);
-
-    // Implementation of Avg.
-    // TODO: Change this to use a fixed-sized BufferVal as intermediate type.
-    static void avg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst);
-    template <typename T>
-    static void avg_update(doris_udf::FunctionContext* ctx, const T& src,
-                           doris_udf::StringVal* dst);
-    template <typename T>
-    static void avg_remove(doris_udf::FunctionContext* ctx, const T& src,
-                           doris_udf::StringVal* dst);
-    static void avg_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
-    static doris_udf::DoubleVal avg_get_value(doris_udf::FunctionContext* ctx,
-                                              const doris_udf::StringVal& val);
-    static doris_udf::DoubleVal avg_finalize(doris_udf::FunctionContext* ctx,
-                                             const doris_udf::StringVal& val);
-
-    // Avg for timestamp. Uses avg_init() and AvgMerge().
-    static void timestamp_avg_update(doris_udf::FunctionContext* ctx,
-                                     const doris_udf::DateTimeVal& src, doris_udf::StringVal* dst);
-    static void timestamp_avg_remove(doris_udf::FunctionContext* ctx,
-                                     const doris_udf::DateTimeVal& src, doris_udf::StringVal* dst);
-    static doris_udf::DateTimeVal timestamp_avg_get_value(doris_udf::FunctionContext* ctx,
-                                                          const doris_udf::StringVal& val);
-    static doris_udf::DateTimeVal timestamp_avg_finalize(doris_udf::FunctionContext* ctx,
-                                                         const doris_udf::StringVal& val);
-
-    // Avg for decimals.
-    static void decimalv2_avg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst);
-    static void decimalv2_avg_update(doris_udf::FunctionContext* ctx,
-                                     const doris_udf::DecimalV2Val& src, doris_udf::StringVal* dst);
-    static void decimalv2_avg_merge(FunctionContext* ctx, const doris_udf::StringVal& src,
-                                    doris_udf::StringVal* dst);
-    static doris_udf::StringVal decimalv2_avg_serialize(doris_udf::FunctionContext* ctx,
-                                                        const doris_udf::StringVal& src);
-    static void decimalv2_avg_remove(doris_udf::FunctionContext* ctx,
-                                     const doris_udf::DecimalV2Val& src, doris_udf::StringVal* dst);
-
-    static doris_udf::DecimalV2Val decimalv2_avg_get_value(doris_udf::FunctionContext* ctx,
-                                                           const doris_udf::StringVal& val);
-    static doris_udf::DecimalV2Val decimalv2_avg_finalize(doris_udf::FunctionContext* ctx,
-                                                          const doris_udf::StringVal& val);
-    // SumUpdate, SumMerge
-    template <typename SRC_VAL, typename DST_VAL>
-    static void sum(doris_udf::FunctionContext*, const SRC_VAL& src, DST_VAL* dst);
-
-    // MinInit
-    template <typename T>
-    static void min_init(doris_udf::FunctionContext*, T* dst);
-
-    // MinUpdate/MinMerge
-    template <typename T>
-    static void min(doris_udf::FunctionContext*, const T& src, T* dst);
-
-    // MaxInit
-    template <typename T>
-    static void max_init(doris_udf::FunctionContext*, T* dst);
-
-    // MaxUpdate/MaxMerge
-    template <typename T>
-    static void max(doris_udf::FunctionContext*, const T& src, T* dst);
-
-    // AnyInit
-    template <typename T>
-    static void any_init(doris_udf::FunctionContext*, T* dst);
-
-    // AnyUpdate/AnyMerge
-    template <typename T>
-    static void any(doris_udf::FunctionContext*, const T& src, T* dst);
-
-    // String concat
-    static void string_concat(doris_udf::FunctionContext*, const doris_udf::StringVal& src,
-                              const doris_udf::StringVal& separator, doris_udf::StringVal* result);
-
-    /// String concat
-    static void string_concat_update(FunctionContext*, const StringVal& src, StringVal* result);
-    static void string_concat_update(FunctionContext*, const StringVal& src,
-                                     const StringVal& separator, StringVal* result);
-    static void string_concat_merge(FunctionContext*, const StringVal& src, StringVal* result);
-    static StringVal string_concat_finalize(FunctionContext*, const StringVal& src);
-
-    // Probabilistic Counting (PC), a distinct estimate algorithms.
-    // Probabilistic Counting with Stochastic Averaging (PCSA) is a variant
-    // of PC that runs faster and usually gets equally accurate results.
-    static void pc_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot);
-
-    template <typename T>
-    static void pc_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst);
-    template <typename T>
-    static void pcsa_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst);
-
-    static void pc_merge(doris_udf::FunctionContext*, const doris_udf::StringVal& src,
-                         doris_udf::StringVal* dst);
-
-    static doris_udf::StringVal pc_finalize(doris_udf::FunctionContext*,
-                                            const doris_udf::StringVal& src);
-
-    static doris_udf::StringVal pcsa_finalize(doris_udf::FunctionContext*,
-                                              const doris_udf::StringVal& src);
-
-    // count and sum distinct algorithm in multi distinct
-    template <typename T>
-    static void count_or_sum_distinct_numeric_init(doris_udf::FunctionContext* ctx,
-                                                   doris_udf::StringVal* dst);
-    template <typename T>
-    static void count_or_sum_distinct_numeric_update(FunctionContext* ctx, T& src, StringVal* dst);
-    template <typename T>
-    static void count_or_sum_distinct_numeric_merge(FunctionContext* ctx, StringVal& src,
-                                                    StringVal* dst);
-    template <typename T>
-    static StringVal count_or_sum_distinct_numeric_serialize(FunctionContext* ctx,
-                                                             const StringVal& state_sv);
-    template <typename T>
-    static BigIntVal count_or_sum_distinct_numeric_finalize(FunctionContext* ctx,
-                                                            const StringVal& state_sv);
-
-    // count distinct in multi distinct for string
-    static void count_distinct_string_init(doris_udf::FunctionContext* ctx,
-                                           doris_udf::StringVal* dst);
-    static void count_distinct_string_update(FunctionContext* ctx, StringVal& src, StringVal* dst);
-    static void count_distinct_string_merge(FunctionContext* ctx, StringVal& src, StringVal* dst);
-    static StringVal count_distinct_string_serialize(FunctionContext* ctx,
-                                                     const StringVal& state_sv);
-    static BigIntVal count_distinct_string_finalize(FunctionContext* ctx,
-                                                    const StringVal& state_sv);
-
-    // count distinct in multi distinct for decimal
-    static void count_or_sum_distinct_decimal_init(doris_udf::FunctionContext* ctx,
-                                                   doris_udf::StringVal* dst);
-    static void count_or_sum_distinct_decimalv2_init(doris_udf::FunctionContext* ctx,
-                                                     doris_udf::StringVal* dst);
-    static void count_or_sum_distinct_decimalv2_update(FunctionContext* ctx, DecimalV2Val& src,
-                                                       StringVal* dst);
-    static void count_or_sum_distinct_decimal_merge(FunctionContext* ctx, StringVal& src,
-                                                    StringVal* dst);
-    static void count_or_sum_distinct_decimalv2_merge(FunctionContext* ctx, StringVal& src,
-                                                      StringVal* dst);
-
-    static StringVal count_or_sum_distinct_decimalv2_serialize(FunctionContext* ctx,
-                                                               const StringVal& state_sv);
-
-    static BigIntVal count_distinct_decimalv2_finalize(FunctionContext* ctx,
-                                                       const StringVal& state_sv);
-    static DecimalV2Val sum_distinct_decimalv2_finalize(FunctionContext* ctx,
-                                                        const StringVal& state_sv);
-
-    // count distinct in multi distinct for Date
-    static void count_distinct_date_init(doris_udf::FunctionContext* ctx,
-                                         doris_udf::StringVal* dst);
-    static void count_distinct_date_update(FunctionContext* ctx, DateTimeVal& src, StringVal* dst);
-    static void count_distinct_date_merge(FunctionContext* ctx, StringVal& src, StringVal* dst);
-    static StringVal count_distinct_date_serialize(FunctionContext* ctx, const StringVal& state_sv);
-    static BigIntVal count_distinct_date_finalize(FunctionContext* ctx, const StringVal& state_sv);
-
-    template <typename T>
-    static BigIntVal sum_distinct_bigint_finalize(FunctionContext* ctx, const StringVal& state_sv);
-    template <typename T>
-    static LargeIntVal sum_distinct_largeint_finalize(FunctionContext* ctx,
-                                                      const StringVal& state_sv);
-    template <typename T>
-    static DoubleVal sum_distinct_double_finalize(FunctionContext* ctx, const StringVal& state_sv);
-
-    /// Knuth's variance algorithm, more numerically stable than canonical stddev
-    /// algorithms; reference implementation:
-    /// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm
-    static void knuth_var_init(FunctionContext* context, StringVal* val);
-    template <typename T>
-    static void knuth_var_update(FunctionContext* context, const T& input, StringVal* val);
-    template <typename T>
-    static void knuth_var_remove(FunctionContext* context, const T& src, StringVal* dst);
-    static void knuth_var_merge(FunctionContext* context, const StringVal& src, StringVal* dst);
-    static DoubleVal knuth_var_finalize(FunctionContext* context, const StringVal& val);
-
-    /// Calculates the biased variance, uses KnuthVar Init-Update-Merge functions
-    static DoubleVal knuth_var_pop_finalize(FunctionContext* context, const StringVal& val);
-
-    /// Calculates STDDEV, uses KnuthVar Init-Update-Merge functions
-    static DoubleVal knuth_stddev_finalize(FunctionContext* context, const StringVal& val);
-
-    /// Calculates the biased STDDEV, uses KnuthVar Init-Update-Merge functions
-    static DoubleVal knuth_stddev_pop_finalize(FunctionContext* context, const StringVal& val);
-
-    static DoubleVal knuth_var_get_value(FunctionContext* ctx, const StringVal& state_sv);
-    static DoubleVal knuth_var_pop_get_value(FunctionContext* context, const StringVal& val);
-    static DoubleVal knuth_stddev_get_value(FunctionContext* ctx, const StringVal& state_sv);
-    static DoubleVal knuth_stddev_pop_get_value(FunctionContext* context, const StringVal& val);
-
-    // variance/stddev for decimals.
-    static void decimalv2_knuth_var_init(FunctionContext* context, StringVal* val);
-    static void knuth_var_remove(FunctionContext* ctx, const DecimalV2Val& src, StringVal* dst);
-    static void knuth_var_update(FunctionContext* context, const DecimalV2Val& src, StringVal* val);
-    static void decimalv2_knuth_var_merge(FunctionContext* context, const StringVal& src,
-                                          StringVal* val);
-    static DecimalV2Val decimalv2_knuth_var_finalize(FunctionContext* context,
-                                                     const StringVal& val);
-    static DecimalV2Val decimalv2_knuth_var_pop_finalize(FunctionContext* context,
-                                                         const StringVal& val);
-    static DecimalV2Val decimalv2_knuth_stddev_finalize(FunctionContext* context,
-                                                        const StringVal& val);
-    static DecimalV2Val decimalv2_knuth_stddev_pop_finalize(FunctionContext* context,
-                                                            const StringVal& val);
-
-    static DecimalV2Val decimalv2_knuth_var_get_value(FunctionContext* ctx,
-                                                      const StringVal& state_sv);
-    static DecimalV2Val decimalv2_knuth_var_pop_get_value(FunctionContext* context,
-                                                          const StringVal& val);
-    static DecimalV2Val decimalv2_knuth_stddev_get_value(FunctionContext* context,
-                                                         const StringVal& val);
-    static DecimalV2Val decimalv2_knuth_stddev_pop_get_value(FunctionContext* context,
-                                                             const StringVal& val);
-
-    /// ----------------------------- Analytic Functions ---------------------------------
-    /// Analytic functions implement the UDA interface (except Merge(), Serialize()) and are
-    /// used internally by the AnalyticEvalNode. Some analytic functions store intermediate
-    /// state as a StringVal which is needed for multiple calls to Finalize(), so some fns
-    /// also implement a (private) GetValue() method to just return the value. In that
-    /// case, Finalize() is only called at the end to clean up.
-
-    // Initializes the state for RANK and DENSE_RANK
-    static void rank_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot);
-
-    // Update state for RANK
-    static void rank_update(doris_udf::FunctionContext*, doris_udf::StringVal* dst);
-
-    // Update state for DENSE_RANK
-    static void dense_rank_update(doris_udf::FunctionContext*, doris_udf::StringVal* dst);
-
-    // Returns the result for RANK and prepares the state for the next Update().
-    static doris_udf::BigIntVal rank_get_value(doris_udf::FunctionContext*,
-                                               doris_udf::StringVal& src);
-
-    // Returns the result for DENSE_RANK and prepares the state for the next Update().
-    // TODO: Implement DENSE_RANK with a single doris_udf::BigIntVal. Requires src can be modified,
-    // AggFnEvaluator would need to handle copying the src doris_udf::AnyVal back into the src slot.
-    static doris_udf::BigIntVal dense_rank_get_value(doris_udf::FunctionContext*,
-                                                     doris_udf::StringVal& src);
-
-    // Returns the result for RANK and DENSE_RANK and cleans up intermediate state in src.
-    static doris_udf::BigIntVal rank_finalize(doris_udf::FunctionContext*,
-                                              doris_udf::StringVal& src);
-
-    // Implements LAST_VALUE.
-    template <typename T>
-    static void last_val_update(doris_udf::FunctionContext*, const T& src, T* dst);
-    template <typename T>
-    static void last_val_remove(doris_udf::FunctionContext*, const T& src, T* dst);
-
-    // Implements FIRST_VALUE.
-    template <typename T>
-    static void first_val_update(doris_udf::FunctionContext*, const T& src, T* dst);
-    // Implements FIRST_VALUE for some windows that require rewrites during planning.
-    // The doris_udf::BigIntVal is unused by first_val_rewrite_update() (it is used by the
-    // AnalyticEvalNode).
-    template <typename T>
-    static void first_val_rewrite_update(doris_udf::FunctionContext*, const T& src,
-                                         const doris_udf::BigIntVal&, T* dst);
-
-    // OffsetFn*() implement LAG and LEAD. Init() sets the default value (the last
-    // constant parameter) as dst.
-    template <typename T>
-    static void offset_fn_init(doris_udf::FunctionContext*, T* dst);
-
-    // Update() takes all the parameters to LEAD/LAG, including the integer offset and
-    // the default value, neither which are needed by Update(). (The offset is already
-    // used in the window for the analytic fn evaluation and the default value is set
-    // in Init().
-    template <typename T>
-    static void offset_fn_update(doris_udf::FunctionContext*, const T& src,
-                                 const doris_udf::BigIntVal&, const T&, T* dst);
-
-    // windowFunnel
-    static void window_funnel_init(FunctionContext* ctx, StringVal* dst);
-    static void window_funnel_update(FunctionContext* ctx, const BigIntVal& window,
-                                     const StringVal& mode, const DateTimeVal& timestamp,
-                                     int num_cond, const BooleanVal* conds, StringVal* dst);
-    static void window_funnel_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst);
-    static StringVal window_funnel_serialize(FunctionContext* ctx, const StringVal& src);
-    static IntVal window_funnel_finalize(FunctionContext* ctx, const StringVal& src);
-
-    // todo(kks): keep following HLL methods only for backward compatibility, we should remove these methods
-    //            when doris 0.12 release
-    static void hll_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot);
-    template <typename T>
-    static void hll_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst);
-    static void hll_merge(doris_udf::FunctionContext*, const doris_udf::StringVal& src,
-                          doris_udf::StringVal* dst);
-    static doris_udf::StringVal hll_finalize(doris_udf::FunctionContext*,
-                                             const doris_udf::StringVal& src);
-
-    static void hll_union_agg_init(doris_udf::FunctionContext*, doris_udf::HllVal* slot);
-    // fill all register according to hll set type
-    static void hll_union_agg_update(doris_udf::FunctionContext*, const doris_udf::HllVal& src,
-                                     doris_udf::HllVal* dst);
-    // merge the register value
-    static void hll_union_agg_merge(doris_udf::FunctionContext*, const doris_udf::HllVal& src,
-                                    doris_udf::HllVal* dst);
-    // return result
-    static doris_udf::BigIntVal hll_union_agg_finalize(doris_udf::FunctionContext*,
-                                                       const doris_udf::HllVal& src);
-
-    // calculate result
-    static int64_t hll_algorithm(uint8_t* pdata, int data_len);
-    static int64_t hll_algorithm(const StringVal& dst) { return hll_algorithm(dst.ptr, dst.len); }
-    static int64_t hll_algorithm(const HllVal& dst) {
-        return hll_algorithm(dst.ptr + 1, dst.len - 1);
-    }
-
-    //  HLL value type aggregate to HLL value type
-    static void hll_raw_agg_init(doris_udf::FunctionContext*, doris_udf::HllVal* slot);
-    static void hll_raw_agg_update(doris_udf::FunctionContext*, const doris_udf::HllVal& src,
-                                   doris_udf::HllVal* dst);
-    static void hll_raw_agg_merge(doris_udf::FunctionContext*, const doris_udf::HllVal& src,
-                                  doris_udf::HllVal* dst);
-    // return result which is HLL type
-    static doris_udf::HllVal hll_raw_agg_finalize(doris_udf::FunctionContext*,
-                                                  const doris_udf::HllVal& src);
-};
-
-} // namespace doris
diff --git a/be/src/exprs/time_operators.cpp b/be/src/exprs/time_operators.cpp
deleted file mode 100644
index 8ee617c1d2..0000000000
--- a/be/src/exprs/time_operators.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exprs/time_operators.h"
-
-#include <math.h>
-
-#include <iomanip>
-#include <sstream>
-
-#include "exprs/anyval_util.h"
-#include "util/date_func.h"
-
-namespace doris {
-void TimeOperators::init() {}
-
-#define CAST_TIME_TO_INT(to_type, type_name)                                                     \
-    to_type TimeOperators::cast_to_##type_name(FunctionContext* context, const DoubleVal& val) { \
-        if (val.is_null) return to_type::null();                                                 \
-        int time = (int)val.val;                                                                 \
-        int second = time % 60;                                                                  \
-        int minute = time / 60 % 60;                                                             \
-        int hour = time / 3600;                                                                  \
-        return to_type(hour * 10000 + minute * 100 + second);                                    \
-    }
-
-#define CAST_FROM_TIME()                          \
-    CAST_TIME_TO_INT(BooleanVal, boolean_val);    \
-    CAST_TIME_TO_INT(TinyIntVal, tiny_int_val);   \
-    CAST_TIME_TO_INT(SmallIntVal, small_int_val); \
-    CAST_TIME_TO_INT(IntVal, int_val);            \
-    CAST_TIME_TO_INT(BigIntVal, big_int_val);     \
-    CAST_TIME_TO_INT(LargeIntVal, large_int_val); \
-    CAST_TIME_TO_INT(FloatVal, float_val);        \
-    CAST_TIME_TO_INT(DoubleVal, double_val);
-
-CAST_FROM_TIME();
-
-StringVal TimeOperators::cast_to_string_val(FunctionContext* ctx, const DoubleVal& val) {
-    if (val.is_null) {
-        return StringVal::null();
-    }
-    char buffer[MAX_TIME_WIDTH];
-    int len = time_to_buffer_from_double(val.val, buffer);
-    return AnyValUtil::from_buffer_temp(ctx, buffer, len);
-}
-
-DateTimeVal TimeOperators::cast_to_datetime_val(FunctionContext* context, const DoubleVal& val) {
-    return DateTimeVal::null();
-}
-} // namespace doris
diff --git a/be/src/exprs/time_operators.h b/be/src/exprs/time_operators.h
deleted file mode 100644
index bb2f9f7f27..0000000000
--- a/be/src/exprs/time_operators.h
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include "udf/udf.h"
-
-namespace doris {
-
-/// Implementation of the time operators. These include the cast,
-/// arithmetic and binary operators.
-class TimeOperators {
-public:
-    static void init();
-
-    static BooleanVal cast_to_boolean_val(FunctionContext*, const DoubleVal&);
-    static TinyIntVal cast_to_tiny_int_val(FunctionContext*, const DoubleVal&);
-    static SmallIntVal cast_to_small_int_val(FunctionContext*, const DoubleVal&);
-    static IntVal cast_to_int_val(FunctionContext*, const DoubleVal&);
-    static BigIntVal cast_to_big_int_val(FunctionContext*, const DoubleVal&);
-    static LargeIntVal cast_to_large_int_val(FunctionContext*, const DoubleVal&);
-    static FloatVal cast_to_float_val(FunctionContext*, const DoubleVal&);
-    static DoubleVal cast_to_double_val(FunctionContext*, const DoubleVal&);
-    static StringVal cast_to_string_val(FunctionContext*, const DoubleVal&);
-    static DateTimeVal cast_to_datetime_val(FunctionContext*, const DoubleVal&);
-};
-} // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index b0f1c4b957..e2342331a8 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -29,10 +29,8 @@ add_library(Olap STATIC
     base_tablet.cpp
     bloom_filter.hpp
     block_column_predicate.cpp
-    byte_buffer.cpp
     compaction.cpp
-    compaction_permit_limiter.cpp
-    compress.cpp
+    compaction_permit_limiter.cpp   
     cumulative_compaction.cpp
     cumulative_compaction_policy.cpp
     delete_handler.cpp
diff --git a/be/src/olap/bhp_lib.h b/be/src/olap/bhp_lib.h
deleted file mode 100644
index 043bde85a5..0000000000
--- a/be/src/olap/bhp_lib.h
+++ /dev/null
@@ -1,753 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-
-namespace doris {
-
-inline int memcmp_sse(const void* buf1, const void* buf2, unsigned int count) {
-    int result;
-
-    __asm__ __volatile__(
-            "cmpl $16, %%edx;"
-            "jb 9f;"
-            "16:" /*  len >= 16 */
-            "movdqu  (%%rdi), %%xmm0;"
-            "movdqu  (%%rsi), %%xmm1;"
-            "pcmpeqb %%xmm1, %%xmm0;"
-            "pmovmskb %%xmm0,%%rcx;"
-            "xorl $0xffff, %%ecx;"
-            "jz 15f;"
-            "bsf %%ecx, %%ecx;" /* diff */
-            "movzb (%%rsi, %%rcx), %%edx;"
-            "movzb (%%rdi, %%rcx), %%eax;"
-            "subl %%edx, %%eax;"
-            "jmp 0f;"
-            "15:" /* same */
-            "subl $16, %%edx;"
-            "jbe 1f;"
-            "movq $16, %%rcx;"
-            "cmpl $16, %%edx;"
-            "jae 14f;"
-            "movl %%edx, %%ecx;"
-            "14:"
-            //"addq %%rcx, %%rdi;"
-            "lea (%%rdi,%%rcx), %%rdi;"
-            "addq %%rcx, %%rsi;"
-            "jmp 16b;"
-
-            "9:" /* 8 =< len < 15 */
-            "cmpl $8, %%edx;"
-            "jb 5f;"
-            "8:"
-            "movq (%%rdi), %%xmm0;"
-            "movq (%%rsi), %%xmm1;"
-            "pcmpeqb %%xmm1, %%xmm0;"
-            "pmovmskb %%xmm0, %%rcx;"
-            "and $0xff, %%ecx;"
-            "xorl $0xff, %%ecx;"
-            "je 7f;"
-            "bsf  %%ecx, %%ecx;" /* diff */
-            "movzb (%%rsi, %%rcx), %%edx;"
-            "movzb (%%rdi, %%rcx), %%eax;"
-            "subl %%edx, %%eax;"
-            "jmp 0f;"
-
-            "7:"
-            "subl $8, %%edx;"
-            "jz 1f;"
-            "movl %%edx, %%ecx;"
-            "movq (%%rdi, %%rcx), %%xmm0;"
-            "movq (%%rsi, %%rcx), %%xmm1;"
-            "pcmpeqb %%xmm1, %%xmm0;"
-            "pmovmskb %%xmm0, %%rcx;"
-            "and $0xff, %%ecx;"
-            "xorl $0xff, %%ecx;"
-            "je 1f;"
-            "bsf  %%ecx, %%ecx;"
-            "addl %%edx, %%ecx;"
-            "movzb (%%rsi, %%rcx), %%edx;"
-            "movzb (%%rdi, %%rcx), %%eax;"
-            "subl %%edx, %%eax;"
-            "jmp 0f;"
-
-            "5:"
-            "cmpl $4, %%edx;"
-            "jb 13f;"
-            "4:"
-            "subl $4, %%edx;"
-            "movl (%%rdi), %%eax;"
-            "movl (%%rsi), %%ecx;"
-            "cmpl  %%ecx, %%eax;"
-            "je 3f;"
-            "bswap %%eax;"
-            "bswap %%ecx;"
-            "cmpl %%ecx, %%eax;"
-            "ja 17f;"
-            "mov $-1, %%eax;"
-            "jmp 0f;"
-            "17:"
-            "mov $1, %%eax;"
-            "jmp 0f;"
-            "3:"
-            "addq $4, %%rdi;"
-            "lea 4(%%rsi), %%rsi;"
-            "13:"
-            "cmpl $0, %%edx;"
-            "je 1f;"
-            "2:"
-
-            "movzbl (%%rdi), %%eax;"
-            "movzbl (%%rsi), %%ecx;"
-            "subl %%ecx, %%eax;"
-            "jne 0f;"
-            "subl $1, %%edx;"
-            "jz 1f;"
-            "movzbl 1(%%rdi), %%eax;"
-            "movzbl 1(%%rsi), %%ecx;"
-            "subl %%ecx, %%eax;"
-            "jne 0f;"
-            "subl $1, %%edx;"
-            "jz 1f;"
-            "movzbl 2(%%rdi), %%eax;"
-            "movzbl 2(%%rsi), %%ecx;"
-            "subl %%ecx, %%eax;"
-            "jmp 0f;"
-
-            "1:"
-            "xorl %%eax, %%eax;"
-            "0:"
-            : "=a"(result), "=D"(buf1), "=S"(buf2), "=d"(count)
-            : "D"(buf1), "S"(buf2), "d"(count)
-            : "%rcx", "%xmm1", "%xmm0", "memory");
-    return result;
-}
-
-//count must be between 0 and 2GB
-/*__attribute__((always_inline))*/ inline int memcmp_sse32(const void* buf1, const void* buf2,
-                                                           int count)
-
-{
-    int result;
-    __asm__ __volatile__(
-            //".align 8;"
-            "cmp $1, %%edx;"
-            "jbe 6f;"
-
-            "addl  $16,  %%edx  ;"
-            "movl  %%edx, %%eax ;"
-            "xor  %%rcx, %%rcx ;"
-
-            "2: "
-            "movdqu  (%%rdi), %%xmm1;"
-            "movdqu  (%%rsi), %%xmm2;"
-            "subl     $16,    %%edx  ;"
-            "subl     $16,    %%eax  ;"
-
-            // " pcmpestri $0x18, %%xmm2, %%xmm1  ;"
-            ".byte 0x66, 0x0f, 0x3a, 0x61, 0xca, 0x18;"
-            " lea     16(%%rsi),    %%rsi  ;"
-            " lea     16(%%rdi),    %%rdi  ;"
-            //zflag=0 and cflag=0;no diff and no end, so continue the loop
-            " ja  2b                     ;"
-            // if cflag=1, jmp; no end but diff
-            " jc  1f                   ;"
-
-            "xorl %%eax, %%eax;"
-            "jmp 0f;"
-
-            "6:"
-            "xor %%eax, %%eax;"
-            "test %%edx, %%edx ;"
-            "jz 0f ;"
-            "movzbl (%%rdi), %%eax;"
-            "movzbl (%%rsi), %%edx;"
-            "subl %%edx, %%eax;"
-            "jmp 0f;"
-
-            "1:"
-            "movzbl  -16(%%rsi, %%rcx), %%edx   ;"
-            "movzbl  -16(%%rdi, %%rcx), %%eax   ;"
-            "subl    %%edx, %%eax               ;"
-
-            "0:"
-            //"mov %%eax, %0;"
-
-            : "=a"(result), "=D"(buf1), "=S"(buf2), "=d"(count)
-            : "D"(buf1), "S"(buf2), "d"(count)
-            : "%rcx", "memory", "xmm1", "xmm2");
-    return result;
-}
-
-/*__attribute__((always_inline))*/ inline int memcmp_sse64(const void* buf1, const void* buf2,
-                                                           size_t count) {
-    int result;
-    __asm__ __volatile__(
-            "cmp $1, %%rdx;"
-            "jbe 6f;"
-
-            "addq $16, %%rdx;"
-            "movq %%rdx,%%rax;"
-            //"xor  %%rcx, %%rcx ;"
-
-            "2: "
-            "movdqu (%%rdi), %%xmm1;"
-            "movdqu (%%rsi), %%xmm2;"
-
-            "subq $16, %%rax;"
-            "subq $16, %%rdx;"
-
-            //"addq $16, %%rsi;"
-            //"addq $16, %%rdi;"
-            //  " pcmpestri $0x18, %%xmm2, %%xmm1  ;"
-            ".byte 0x66, 0x0f, 0x3a, 0x61, 0xca, 0x18;"
-            "lea 16(%%rsi), %%rsi;"
-            "lea 16(%%rdi), %%rdi;"
-            "ja 2b;" //no diff and no end, so continue the loop
-            "jc 1f;" // no end but diff
-
-            "xorl %%eax, %%eax;"
-            "jmp 0f;"
-
-            "6:"
-            "xor %%eax, %%eax;"
-            "test %%edx, %%edx ;"
-            "jz 0f ;"
-            "movzbl (%%rdi), %%eax;"
-            "movzbl (%%rsi), %%edx;"
-            "subl %%edx, %%eax;"
-            "jmp 0f;"
-
-            "1:"
-            "movzbl  -16(%%rsi, %%rcx), %%edx   ;"
-            "movzbl  -16(%%rdi, %%rcx), %%eax   ;"
-            "subl    %%edx, %%eax               ;"
-
-            "0:"
-            //"mov %%eax, %0;"
-
-            : "=a"(result), "=D"(buf1), "=S"(buf2), "=d"(count)
-            : "D"(buf1), "S"(buf2), "d"(count)
-            : "%rcx", "memory", "xmm1", "xmm2");
-    return result;
-}
-
-/*__attribute__((always_inline))*/ inline int find_chr_from_mem(const char* s, int c, int len) {
-    //len : edx; c: esi; s:rdi
-    int index;
-    __asm__ __volatile__(
-            "and $0xff, %%esi;" //clear upper bytes
-            "movd %%esi, %%xmm1;"
-
-            "mov $1, %%eax;"
-            "add $16, %%edx;"
-            "mov %%rdi ,%%r8;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm2;"
-            "sub $16, %%edx;"
-            "addq $16, %%rdi;"
-            //"pcmpestri $0x0, %%xmm2,%%xmm1;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x61 ,0xca ,0x00;"
-            //"lea 16(%%rdi), %%rdi;"
-            "ja 1b;" //Res2==0:no match and zflag==0: s is not end
-            "jc 3f;" //Res2==1: match and s is not end
-
-            "mov $0xffffffff, %%eax;" //no match
-            "jmp 0f;"
-
-            "3:"
-            "sub %%r8, %%rdi;"
-            "lea -16(%%edi,%%ecx),%%eax;"
-
-            "0:"
-            //        "mov %%eax, %0;"
-            : "=a"(index), "=D"(s), "=S"(c), "=d"(len)
-            : "D"(s), "S"(c), "d"(len)
-            : "rcx", "r8", "memory", "xmm1", "xmm2");
-    return index;
-}
-
-/*__attribute__((always_inline))*/ inline int find_chr_from_str(const char* s, int c, int len) {
-    //s:rdi; c:rsi; len:rdx
-    int index;
-    __asm__ __volatile__(
-            "and $0xff, %%esi;" //clear upper bytes
-            "movd %%esi, %%xmm1;"
-            "xor %%r8d,%%r8d;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm2;"
-            "add $16, %%r8d;"
-            "addq $16, %%rdi;"
-            //        "pcmpistri $0x0, %%xmm2,%%xmm1;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x63 ,0xca ,0x00;"
-            //"lea 16(%%rdi), %%rdi;"
-            "ja 4f;"  // not null and no match, so clarify whether over the end
-            "jc 2f;"  //match
-            "jmp 3f;" //null and no match
-
-            "4:"
-            "cmp %%r8d,%%edx;"
-            "ja 1b;"
-
-            "3:"
-            "mov $0xffffffff, %%eax;" // the end and no match
-            "jmp 0f;"
-
-            "2:"
-
-            "lea -16(%%r8d, %%ecx), %%eax;"
-            "cmp %%edx, %%eax;"
-            "jae 3b;"
-
-            "0:"
-            // "mov %%eax, %0;"
-
-            : "=a"(index), "=D"(s), "=S"(c), "=d"(len)
-            : "D"(s), "S"(c), "d"(len)
-            : "rcx", "r8", "memory", "xmm1", "xmm2");
-    return index;
-}
-
-/*__attribute__((always_inline))*/ inline char* strchr_sse(const char* s, int c) {
-    //s:rdi; c:rsi
-    char* ret;
-    __asm__ __volatile__(
-            "and $0xff, %%esi;" //clear upper bytes
-            //c==0
-            "test %%esi, %%esi;"
-            "jnz 0f ;"
-            "movq %%rdi, %%rax;"
-            "pxor %%xmm1, %%xmm1;"
-            "3:"
-            "movdqu (%%rdi), %%xmm2;"
-
-            "addq $16, %%rdi;"
-            // "pcmpistri $0x8, %%xmm2,%%xmm1;"
-            ".byte 0x66, 0x0f, 0x3a, 0x63, 0xca, 0x08;"
-            "jnz 3b;"
-
-            "leaq -16(%%rdi,%%rcx), %%rax;"
-            "jmp 2f;"
-
-            "0:"
-            "movd %%esi, %%xmm1;"
-            //"xor %%rcx, %%rcx;"
-            "xor %%rax, %%rax;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm2;"
-
-            "addq $16, %%rdi;"
-            //        "pcmpistri $0x0, %%xmm2,%%xmm1;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x63 ,0xca ,0x00;"
-            "ja 1b;"
-            "jnc 2f;"
-            "lea -16(%%rdi, %%rcx), %%rax;"
-            "2:"
-
-            : "=a"(ret), "=D"(s), "=S"(c)
-            : "D"(s), "S"(c)
-            : "rcx", "memory", "xmm1", "xmm2");
-    return ret;
-}
-
-/*__attribute__((always_inline))*/ inline char* strrchr_sse(const char* s, int c) {
-    //s:rdi; c:rsi
-    char* ret;
-    __asm__ __volatile__(
-            "and $0xff, %%esi;" //clear upper bytes
-            //c==0
-            "test %%esi, %%esi;"
-            "jnz 0f ;"
-
-            "movq %%rdi, %%rax;"
-            "pxor %%xmm1, %%xmm1;"
-            "3:"
-            "movdqu (%%rdi), %%xmm2;"
-
-            "addq $16, %%rdi;"
-            // "pcmpistri $0x8, %%xmm2,%%xmm1;"
-            ".byte 0x66, 0x0f, 0x3a, 0x63, 0xca, 0x08;"
-            "jnz 3b;"
-
-            "leaq -16(%%rdi,%%rcx), %%rax;"
-            "jmp 3f;"
-
-            "0:"
-            "movd %%esi, %%xmm1;"
-            //"xor %%rcx, %%rcx;"
-            "xor %%rax, %%rax;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm2;"
-
-            "addq $16, %%rdi;"
-            //        "pcmpistri $0x40, %%xmm2,%%xmm1;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x63 ,0xca ,0x40;"
-            "ja 1b;" //zflag =0 and cflag =0, it means no end and no match
-
-            "jz 2f;"                        //zflag =1, the end of string
-            "lea -16(%%rdi, %%rcx), %%rax;" //cflag =1
-            "jmp 1b;"
-
-            "2:"
-            "jnc 3f;"
-            "lea -16(%%rdi, %%rcx), %%rax;"
-            "3:"
-            //"mov %%rax, %0;"
-            : "=a"(ret), "=D"(s), "=S"(c)
-            : "D"(s), "S"(c)
-            : "rcx", "memory", "xmm1", "xmm2");
-    return ret;
-}
-
-inline char* strrchr_end_sse(char const* b, char const* e, char c) {
-    //b:rdi; e:rsi; c:rdx
-    char* ret;
-
-    __asm__ __volatile__(
-
-            //       "movzbq %5, %%rdx;"
-            //  "mov %%rdx, %%r8;"
-            "movzbq %5, %%r8;"
-
-            "cmp $0, %%rdi;"
-            "jbe 1f;"
-
-            //calculate rdx, decide where to go
-            "mov %%rsi, %%rdx;"
-            "subq %%rdi, %%rdx;"
-            "jbe 1f;" // if begin >= end, return
-            "cmp $7, %%rdx;"
-            "jna 2f;"
-
-            // rdx >= 8
-            "movd %%r8, %%xmm1;"
-            "mov $1, %%rax;"
-            "cmp $16, %%rdx;"
-            "ja 3f;" //  if rdx > 16, jmp to 3f
-
-            "5:"
-            // 8 <= rdx <= 16
-            "subq %%rdx, %%rsi;"
-            "movdqu (%%rsi), %%xmm2;"
-            // "pcmpestri $0x40, %%xmm2, %%xmm1;"
-            ".byte 0x66, 0x0f, 0x3a, 0x61, 0xca, 0x40;"
-            "jnc 1f; "                   // if cflag=0, not match, jmp to 1f
-            "lea (%%rsi, %%rcx), %%rax;" // matched
-            "jmp 0f;"
-
-            // after 16-bytes compare
-            "4:"
-            "subq $16, %%rdx;"
-            "cmp $7, %%rdx;"
-            "jna 2f;" // if rdx < 8, jmp to 2f
-            "cmp $16, %%rdx;"
-            "jna 5b;"
-
-            "3:"
-            "subq $16, %%rsi;"
-            "movdqu (%%rsi), %%xmm2;"
-            // "pcmpestri $0x40, %%xmm2, %%xmm1;"
-            ".byte 0x66, 0x0f, 0x3a, 0x61, 0xca, 0x40;"
-            "ja 4b;"                     // cflag = 0:not match && zflag = 0:not end >>> loopback
-            "lea (%%rsi, %%rcx), %%rax;" // rdx > 16, zflag always = 0, match
-            "jmp 0f;"
-
-            "2:"
-            // 0 < rdx < 8
-            "mov %%r8, %%rax;"
-
-            // switch rdx;
-            "cmpb -1(%%rsi), %%al;"
-            "jne 11f;"
-            "lea -1(%%rsi), %%rax;"
-            "jmp 0f;"
-            "11:"
-            "cmp $1, %%rdx;"
-            "je 1f;"
-
-            "cmpb -2(%%rsi), %%al;"
-            "jne 12f;"
-            "lea -2(%%rsi), %%rax;"
-            "jmp 0f;"
-            "12:"
-            "cmp $2, %%rdx;"
-            "je 1f;"
-
-            "cmpb -3(%%rsi), %%al;"
-            "jne 13f;"
-            "lea -3(%%rsi), %%rax;"
-            "jmp 0f;"
-            "13:"
-            "cmp $3, %%rdx;"
-            "je 1f;"
-
-            "cmpb -4(%%rsi), %%al;"
-            "jne 14f;"
-            "lea -4(%%rsi), %%rax;"
-            "jmp 0f;"
-            "14:"
-            "cmp $4, %%rdx;"
-            "je 1f;"
-
-            "cmpb -5(%%rsi), %%al;"
-            "jne 15f;"
-            "lea -5(%%rsi), %%rax;"
-            "jmp 0f;"
-            "15:"
-            "cmp $5, %%rdx;"
-            "je 1f;"
-
-            "cmpb -6(%%rsi), %%al;"
-            "jne 16f;"
-            "lea -6(%%rsi), %%rax;"
-            "jmp 0f;"
-            "16:"
-            "cmp $6, %%rdx;"
-            "je 1f;"
-
-            "cmpb -7(%%rsi), %%al;"
-            "jne 1f;"
-            "lea -7(%%rsi), %%rax;"
-            "jmp 0f;"
-
-            // failed return
-            "1:"
-            "xor %%rax, %%rax;" // return null
-
-            // success return
-            "0:"
-
-            : "=a"(ret), "=D"(b), "=S"(e) //,"=d"(c)
-            : "D"(b), "S"(e), "r"(c)
-            : "r8", "rcx", "memory", "xmm1", "xmm2", "rdx");
-    return ret;
-}
-
-/*__attribute__((always_inline))*/ inline void* memchr_sse(const void* s, int c, size_t n) {
-    //s:rdi; c:rsi; n:rdx
-    void* ret;
-    __asm__ __volatile__(
-            "and $0xff, %%esi;" //clear upper bytes
-            "movd %%esi, %%xmm1;"
-
-            "mov $1, %%rax;"
-            "add $16, %%rdx;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm2;"
-            "sub $16, %%rdx;"
-            "addq $16, %%rdi;"
-            //"pcmpestri $0x0, %%xmm2,%%xmm1;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x61 ,0xca ,0x00;"
-            //"lea 16(%%rdi), %%rdi;"
-            "ja 1b;" //Res2==0:no match and zflag==0: s is not end
-            "jc 3f;" //Res2==1: match and s is not end
-
-            "mov $0x0, %%rax;" //no match
-            "jmp 0f;"
-
-            "3:"
-
-            "lea -16(%%rdi,%%rcx),%%rax;"
-
-            "0:"
-            //"mov %%rax, %0;"
-            : "=a"(ret), "=D"(s), "=S"(c), "=d"(n)
-            : "D"(s), "S"(c), "d"(n)
-            : "rcx", "memory", "xmm1", "xmm2");
-    return ret;
-}
-
-/*__attribute__((always_inline))*/ inline size_t strlen_sse(const char* s) {
-    //s:rdi
-    size_t ret;
-    __asm__ __volatile__(
-            "movq $-16, %%rax;"
-            //"xor %%rcx, %%rcx;"
-            "pxor %%xmm0, %%xmm0;"
-
-            "1:"
-            "movdqu (%%rdi), %%xmm1;"
-            "addq $16, %%rax;"
-            "addq $16, %%rdi;"
-            //"pcmpistri $0x8, %%xmm1,%%xmm0;"
-            ".byte 0x66, 0x0f, 0x3a, 0x63, 0xc1, 0x08;"
-            //"lea     16(%%rdi),    %%rdi  ;"
-            //"lea     16(%%rax),    %%rax  ;"
-            "jnz 1b;"
-
-            "addq %%rcx, %%rax;"
-            //"mov %%rax, %0;"
-            : "=a"(ret), "=D"(s)
-            : "D"(s)
-            : "rcx", "memory", "xmm0", "xmm1");
-    return ret;
-}
-
-/*__attribute__((always_inline))*/ inline int strcmp_sse(const char* s1, const char* s2)
-
-{
-    //s1:rdi; s2:rsi
-    int result;
-    __asm__ __volatile__(
-            "xor %%rax, %%rax ;"
-            //"xor %%rcx, %%rcx ;"
-
-            "1:"
-            "movdqu  (%%rdi), %%xmm1;"
-            "movdqu  (%%rsi), %%xmm2;"
-            "addq $16, %%rsi;"
-            "addq $16, %%rdi;"
-            //       " pcmpistri $0x18, %%xmm2, %%xmm1  ;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x63 ,0xca ,0x18;"
-            " ja  1b                     ;"
-
-            "jnc  0f;"
-            "movzbq  -16(%%rsi, %%rcx), %%rdx   ;"
-            "movzbq  -16(%%rdi, %%rcx), %%rax   ;"
-            //      "sub     %%rdx, %%rax               ;"
-            "movl $1, %%ecx;"
-            "movl $-1, %%edi;"
-            "cmp  %%rdx, %%rax;"
-            "cmova %%ecx, %%eax;"
-            "cmovb %%edi, %%eax;"
-
-            "0:"
-            //"mov %%eax, %0;"
-
-            : "=a"(result), "=D"(s1), "=S"(s2)
-            : "D"(s1), "S"(s2)
-            : "rcx", "rdx", "memory", "xmm1", "xmm2");
-    return result;
-}
-
-/*__attribute__((always_inline))*/ inline int strncmp_sse(const char* s1, const char* s2, size_t n)
-
-{
-    //s1:rdi; s2:rsi; n:rdx
-    int result;
-    __asm__ __volatile__(
-            "cmp $1, %%rdx;"
-            "jbe 3f;"
-
-            "xor %%rax, %%rax ;"
-
-            "1:"
-            "movdqu  (%%rdi), %%xmm1;"
-            "movdqu  (%%rsi), %%xmm2;"
-            "addq $16, %%rdi;"
-            "addq $16, %%rsi;"
-            // " pcmpistri $0x18, %%xmm2, %%xmm1  ;"
-            ".byte 0x66 ,0x0f ,0x3a ,0x63 ,0xca ,0x18;"
-            //  "lea 16(%%rsi), %%rsi;"
-            //  "lea 16(%%rdi), %%rdi;"
-            "ja  2f                     ;" //both 16Byte data elements are valid and identical
-            "jnc  0f;"                     //Both 16byte data elements have EOS and identical
-
-            //the following situation is  Both 16byte data elements differ at offset X (ecx).
-
-            "cmp %%rdx, %%rcx;"
-            "jae 0f;" // X is out of n
-
-            "movzbq  -16(%%rsi, %%rcx), %%rdx   ;" // X is in the range of n
-            "movzbq  -16(%%rdi, %%rcx), %%rax   ;"
-            "subq     %%rdx, %%rax               ;"
-            "jmp 0f;"
-
-            "2:"
-            "subq $16, %%rdx;"
-            "jbe 0f;"
-            "ja 1b;"
-
-            "3:"
-            "xor %%eax, %%eax;"
-            "test %%rdx, %%rdx ;"
-            "jz 0f ;"
-            "movzbl (%%rdi), %%eax;"
-            "movzbl (%%rsi), %%edx;"
-            "subl %%edx, %%eax;"
-
-            "0:"
-            //  "mov %%eax, %0;"
-
-            : "=a"(result), "=D"(s1), "=S"(s2), "=d"(n)
-            : "D"(s1), "S"(s2), "d"(n)
-            : "rcx", "memory", "xmm1", "xmm2");
-    return result;
-}
-
-/*__attribute__((always_inline))*/ inline int baidu_crc32_byte(char const* src, int crc,
-                                                               int length) {
-    int crc_out;
-    __asm__ __volatile__(
-            "1:"
-            "movzbl (%%rdi), %%ecx;"
-            //"crc32b %%cl, %%esi;"
-            ".byte 0xf2, 0xf, 0x38, 0xf0, 0xf1;"
-
-            "add $1, %%rdi;"
-            "sub $1, %%edx;"
-            "jnz 1b;"
-            "movl %%esi,%%eax;"
-            : "=a"(crc_out), "=D"(src), "=S"(crc), "=d"(length)
-            : "D"(src), "S"(crc), "d"(length)
-            : "memory", "ecx");
-
-    return crc_out;
-}
-
-inline int crc32c_qw(char const* src, int crc, unsigned int qwlen) {
-    int crc_out;
-    __asm__ __volatile__(
-            "1:"
-            //      "crc32q (%%rdi), %%rsi;"
-            ".byte 0xf2 ,0x48 ,0x0f ,0x38 ,0xf1, 0x37;"
-
-            "addq $8, %%rdi;"
-            "subl $1, %%edx;"
-            "jnz 1b;"
-            "mov %%esi,%%eax;"
-            : "=a"(crc_out), "=D"(src), "=S"(crc), "=d"(qwlen)
-            : "D"(src), "S"(crc), "d"(qwlen)
-            : "memory");
-    return crc_out;
-}
-
-inline int baidu_crc32_qw(char const* src, int crc, unsigned int length) {
-    unsigned int iquotient = length >> 3;
-    unsigned int iremainder = length & 0x7;
-    char const* p;
-
-    if (iquotient) {
-        crc = crc32c_qw(src, crc, iquotient);
-    }
-
-    if (iremainder) {
-        p = src + (length - iremainder);
-        crc = baidu_crc32_byte(p, crc, iremainder);
-    }
-
-    return crc;
-}
-
-} // namespace doris
diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp
deleted file mode 100644
index 8be7373754..0000000000
--- a/be/src/olap/byte_buffer.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "byte_buffer.h"
-
-#include <sys/mman.h>
-
-#include "olap/utils.h"
-#include "runtime/thread_context.h"
-
-namespace doris {
-using namespace ErrorCode;
-
-StorageByteBuffer::StorageByteBuffer()
-        : _array(nullptr), _capacity(0), _limit(0), _position(0), _is_mmap(false) {}
-
-StorageByteBuffer::BufDeleter::BufDeleter() : _is_mmap(false), _mmap_length(0) {}
-
-void StorageByteBuffer::BufDeleter::set_mmap(size_t mmap_length) {
-    _is_mmap = true;
-    _mmap_length = mmap_length;
-}
-
-void StorageByteBuffer::BufDeleter::operator()(char* p) {
-    if (nullptr == p) {
-        return;
-    }
-
-    if (_is_mmap) {
-        if (0 != munmap(p, _mmap_length)) {
-            LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << _mmap_length
-                       << ", errno=" << Errno::no() << ", errno_str=" << Errno::str();
-        } else {
-            RELEASE_THREAD_MEM_TRACKER(_mmap_length);
-        }
-    } else {
-        delete[] p;
-    }
-}
-
-// 创建ByteBuffer与array
-StorageByteBuffer* StorageByteBuffer::create(uint64_t capacity) {
-    char* memory = new (std::nothrow) char[capacity];
-    StorageByteBuffer* buf = new (std::nothrow) StorageByteBuffer;
-
-    if (buf != nullptr && memory != nullptr) {
-        buf->_buf = std::shared_ptr<char>(memory, BufDeleter());
-        buf->_array = buf->_buf.get();
-        buf->_capacity = capacity;
-        buf->_limit = capacity;
-        return buf;
-    }
-
-    SAFE_DELETE(buf);
-    SAFE_DELETE_ARRAY(memory);
-    return nullptr;
-}
-
-StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* reference,
-                                                       uint64_t offset, uint64_t length) {
-    if (nullptr == reference || 0 == length) {
-        return nullptr;
-    }
-
-    if (offset + length > reference->capacity()) {
-        return nullptr;
-    }
-
-    StorageByteBuffer* buf = new (std::nothrow) StorageByteBuffer();
-
-    if (nullptr == buf) {
-        return nullptr;
-    }
-
-    buf->_buf = reference->_buf;
-    buf->_array = &(reference->_array[offset]);
-    buf->_capacity = length;
-    buf->_limit = length;
-    buf->_is_mmap = reference->_is_mmap;
-
-    return buf;
-}
-
-StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
-                                           int fd, uint64_t offset) {
-    CONSUME_THREAD_MEM_TRACKER(length);
-    char* memory = (char*)::mmap(start, length, prot, flags, fd, offset);
-
-    if (MAP_FAILED == memory) {
-        LOG(WARNING) << "fail to mmap. [errno='" << Errno::no() << "' errno_str='" << Errno::str()
-                     << "']";
-        RELEASE_THREAD_MEM_TRACKER(length);
-        return nullptr;
-    }
-
-    BufDeleter deleter;
-    deleter.set_mmap(length);
-
-    StorageByteBuffer* buf = new (std::nothrow) StorageByteBuffer();
-
-    if (nullptr == buf) {
-        deleter(memory);
-        LOG(WARNING) << "fail to allocate StorageByteBuffer.";
-        RELEASE_THREAD_MEM_TRACKER(length);
-        return nullptr;
-    }
-
-    buf->_buf = std::shared_ptr<char>(memory, deleter);
-    buf->_array = buf->_buf.get();
-    buf->_capacity = length;
-    buf->_limit = length;
-    buf->_is_mmap = true;
-    return buf;
-}
-
-StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot,
-                                           int flags) {
-    if (nullptr == handler) {
-        LOG(WARNING) << "invalid file handler";
-        return nullptr;
-    }
-
-    size_t length = handler->length();
-    int fd = handler->fd();
-    CONSUME_THREAD_MEM_TRACKER(length);
-    char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset);
-
-    if (MAP_FAILED == memory) {
-        LOG(WARNING) << "fail to mmap. [errno='" << Errno::no() << "' errno_str='" << Errno::str()
-                     << "']";
-        RELEASE_THREAD_MEM_TRACKER(length);
-        return nullptr;
-    }
-
-    BufDeleter deleter;
-    deleter.set_mmap(length);
-
-    StorageByteBuffer* buf = new (std::nothrow) StorageByteBuffer();
-
-    if (nullptr == buf) {
-        deleter(memory);
-        LOG(WARNING) << "fail to allocate StorageByteBuffer.";
-        RELEASE_THREAD_MEM_TRACKER(length);
-        return nullptr;
-    }
-
-    buf->_buf = std::shared_ptr<char>(memory, deleter);
-    buf->_array = buf->_buf.get();
-    buf->_capacity = length;
-    buf->_limit = length;
-    buf->_is_mmap = true;
-    return buf;
-}
-
-Status StorageByteBuffer::put(char src) {
-    if (_position < _limit) {
-        _array[_position++] = src;
-        return Status::OK();
-    }
-
-    return Status::Error<BUFFER_OVERFLOW>();
-}
-
-Status StorageByteBuffer::put(uint64_t index, char src) {
-    if (index < _limit) {
-        _array[index] = src;
-        return Status::OK();
-    }
-
-    return Status::Error<BUFFER_OVERFLOW>();
-}
-
-Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
-                              uint64_t length) {
-    //没有足够的空间可以写
-    if (length > remaining()) {
-        return Status::Error<BUFFER_OVERFLOW>();
-    }
-
-    //src不够大
-    if (offset + length > src_size) {
-        return Status::Error<OUT_OF_BOUND>();
-    }
-
-    memory_copy(&_array[_position], &src[offset], length);
-    _position += length;
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/olap/byte_buffer.h b/be/src/olap/byte_buffer.h
deleted file mode 100644
index a0fe460243..0000000000
--- a/be/src/olap/byte_buffer.h
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "olap/file_helper.h"
-#include "olap/olap_define.h"
-#include "util/mem_util.hpp"
-
-namespace doris {
-
-// ByteBuffer is a class used for data caching
-// ByteBuffer maintains an internal char array for caching data;
-// ByteBuffer maintains internal Pointers for reading and writing data;
-//
-// ByteBuffer has the following important usage concepts:
-// capacity - the capacity of the buffer, set at initialization, is the size of the internal char array
-// position - the current internal pointer position
-// limit - maximum usage limit, this value is less than or equal to capacity, position is always less than limit
-//
-// ByteBuffer supports safe shallow copying of data directly using the copy constructor or = operator
-class StorageByteBuffer {
-public:
-    // Create a StorageByteBuffer of capacity with the new method.
-    // The position of the new buffer is 0, and the limit is capacity
-    // The caller obtains the ownership of the newly created ByteBuffer, and needs to use delete method to delete the obtained StorageByteBuffer
-    //
-    // TODO. I think the use of create here should directly return the ByteBuffer itself instead of the smart pointer,
-    // otherwise the smart pointer will not work,
-    // and the current memory management is still manual.and need to think delete.
-    static StorageByteBuffer* create(uint64_t capacity);
-
-    // Create a new StorageByteBuffer by referencing another ByteBuffer's memory
-    // The position of the new buffer is 0, and the limit is length
-    // The caller obtains the ownership of the newly created ByteBuffer, and needs to use delete method to delete the obtained StorageByteBuffer
-    // Inputs:
-    //   - reference referenced memory
-    //   - offset The position of the referenced Buffer in the original ByteBuffer, i.e.&reference->array()[offset]
-    //   - length The length of the referenced Buffer
-    // Notes:
-    //   offset + length < reference->capacity
-    //
-    // TODO. same as create
-    static StorageByteBuffer* reference_buffer(StorageByteBuffer* reference, uint64_t offset,
-                                               uint64_t length);
-
-    // Create a ByteBuffer through mmap, and the memory after successful mmap is managed by ByteBuffer
-    // start, length, prot, flags, fd, offset are all parameters of mmap function
-    // The caller obtains the ownership of the newly created ByteBuffer, and needs to use delete method to delete the obtained StorageByteBuffer
-    static StorageByteBuffer* mmap(void* start, uint64_t length, int prot, int flags, int fd,
-                                   uint64_t offset);
-
-    // Since olap files are encapsulated with FileHandler, the interface is slightly modified
-    // and the omitted parameters can be obtained in the handler.
-    // The old interface is still preserved, maybe it will be used?
-    static StorageByteBuffer* mmap(FileHandler* handler, uint64_t offset, int prot, int flags);
-
-    uint64_t capacity() const { return _capacity; }
-
-    uint64_t position() const { return _position; }
-    // Set the position of the internal pointer
-    // If the new position is greater than or equal to limit, return Status::Error<ErrorCode::INVALID_ARGUMENT>()
-    Status set_position(uint64_t new_position) {
-        if (new_position <= _limit) {
-            _position = new_position;
-            return Status::OK();
-        } else {
-            return Status::Error<ErrorCode::INVALID_ARGUMENT>();
-        }
-    }
-
-    uint64_t limit() const { return _limit; }
-    //set new limit
-    //If limit is greater than capacity, return Status::Error<ErrorCode::INVALID_ARGUMENT>()
-    //If position is greater than the new limit, set position equal to limit
-    Status set_limit(uint64_t new_limit) {
-        if (new_limit > _capacity) {
-            return Status::Error<ErrorCode::INVALID_ARGUMENT>();
-        }
-
-        _limit = new_limit;
-
-        if (_position > _limit) {
-            _position = _limit;
-        }
-
-        return Status::OK();
-    }
-
-    uint64_t remaining() const { return _limit - _position; }
-
-    // Set limit to current position
-    // set position to 0
-    // This function can be used to change the ByteBuffer from the write state to the read state,
-    //  that is, call this function after some writes, and then read the ByteBuffer.
-    void flip() {
-        _limit = _position;
-        _position = 0;
-    }
-
-    // The following three read functions are inline optimized
-
-    // Read one byte of data, increase position after completion
-    Status get(char* result) {
-        if (OLAP_LIKELY(_position < _limit)) {
-            *result = _array[_position++];
-            return Status::OK();
-        } else {
-            return Status::Error<ErrorCode::OUT_OF_BOUND>();
-        }
-    }
-
-    // Read one byte of data at the specified location
-    Status get(uint64_t index, char* result) {
-        if (OLAP_LIKELY(index < _limit)) {
-            *result = _array[index];
-            return Status::OK();
-        } else {
-            return Status::Error<ErrorCode::OUT_OF_BOUND>();
-        }
-    }
-
-    // Read a piece of data of length length to dst, and increase the position after completion
-    Status get(char* dst, uint64_t dst_size, uint64_t length) {
-        // Not enough data to read
-        if (OLAP_UNLIKELY(length > remaining())) {
-            return Status::Error<ErrorCode::OUT_OF_BOUND>();
-        }
-
-        // dst is not big enough
-        if (OLAP_UNLIKELY(length > dst_size)) {
-            return Status::Error<ErrorCode::BUFFER_OVERFLOW>();
-        }
-
-        memory_copy(dst, &_array[_position], length);
-        _position += length;
-        return Status::OK();
-    }
-
-    // Read dst_size long data to dst
-    Status get(char* dst, uint64_t dst_size) { return get(dst, dst_size, dst_size); }
-
-    // Write a byte, increment position when done
-    // If position >= limit before writing, return Status::Error<ErrorCode::BUFFER_OVERFLOW>()
-    Status put(char src);
-
-    // Write data at the index position without changing the position
-    // Returns:
-    //   Status::Error<ErrorCode::BUFFER_OVERFLOW>() : index >= limit
-    Status put(uint64_t index, char src);
-
-    // Read length bytes from &src[offset], write to buffer, and increase position after completion
-    // Returns:
-    //   Status::Error<ErrorCode::BUFFER_OVERFLOW>(): remaining() < length
-    //   Status::Error<ErrorCode::OUT_OF_BOUND>(): offset + length > src_size
-    Status put(const char* src, uint64_t src_size, uint64_t offset, uint64_t length);
-
-    // write a set of data
-    Status put(const char* src, uint64_t src_size) { return put(src, src_size, 0, src_size); }
-
-    // Returns the char array inside the ByteBuffer
-    const char* array() const { return _array; }
-    const char* array(size_t position) const {
-        return position >= _limit ? nullptr : &_array[position];
-    }
-    char* array() { return _array; }
-
-private:
-    // A custom destructor class that supports destructing the memory of new[] and mmap
-    // Use delete to release by default
-    class BufDeleter {
-    public:
-        BufDeleter();
-        // Set to use mmap method
-        void set_mmap(size_t mmap_length);
-        void operator()(char* p);
-
-    private:
-        bool _is_mmap;       // whether to use mmap
-        size_t _mmap_length; // If mmap is used, record the length of mmap
-    };
-
-private:
-    // Direct creation of ByteBuffer is not supported, but created through the create method
-    StorageByteBuffer();
-
-private:
-    std::shared_ptr<char> _buf; // managed memory
-    char* _array;
-    uint64_t _capacity;
-    uint64_t _limit;
-    uint64_t _position;
-    bool _is_mmap;
-};
-
-} // namespace doris
diff --git a/be/src/olap/compress.cpp b/be/src/olap/compress.cpp
deleted file mode 100644
index db07a87812..0000000000
--- a/be/src/olap/compress.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "compress.h"
-
-#include "olap/byte_buffer.h"
-#include "olap/utils.h"
-
-namespace doris {
-
-#ifdef DORIS_WITH_LZO
-Status lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
-    size_t out_length = 0;
-    Status res = Status::OK();
-    *smaller = false;
-    res = olap_compress(&(in->array()[in->position()]), in->remaining(),
-                        &(out->array()[out->position()]), out->remaining(), &out_length,
-                        OLAP_COMP_STORAGE);
-
-    if (res.ok()) {
-        if (out_length < in->remaining()) {
-            *smaller = true;
-            out->set_position(out->position() + out_length);
-        }
-    }
-
-    return res;
-}
-
-Status lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out) {
-    size_t out_length = 0;
-    Status res = Status::OK();
-    res = olap_decompress(&(in->array()[in->position()]), in->remaining(),
-                          &(out->array()[out->position()]), out->remaining(), &out_length,
-                          OLAP_COMP_STORAGE);
-
-    if (res.ok()) {
-        out->set_limit(out_length);
-    }
-
-    return res;
-}
-#endif
-
-Status lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
-    size_t out_length = 0;
-    Status res = Status::OK();
-    *smaller = false;
-    res = olap_compress(&(in->array()[in->position()]), in->remaining(),
-                        &(out->array()[out->position()]), out->remaining(), &out_length,
-                        OLAP_COMP_LZ4);
-
-    if (res.ok()) {
-        if (out_length < in->remaining()) {
-            *smaller = true;
-            out->set_position(out->position() + out_length);
-        }
-    }
-
-    return res;
-}
-
-Status lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out) {
-    size_t out_length = 0;
-    Status res = Status::OK();
-    res = olap_decompress(&(in->array()[in->position()]), in->remaining(),
-                          &(out->array()[out->position()]), out->remaining(), &out_length,
-                          OLAP_COMP_LZ4);
-
-    if (res.ok()) {
-        out->set_limit(out_length);
-    }
-
-    return res;
-}
-
-} // namespace doris
diff --git a/be/src/olap/compress.h b/be/src/olap/compress.h
deleted file mode 100644
index 255d5a415c..0000000000
--- a/be/src/olap/compress.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "olap/olap_define.h"
-
-namespace doris {
-
-class StorageByteBuffer;
-
-// Define a compression function to compress the remaining memory in the input buffer
-// and save it to the remaining space in the output buffer
-// Inputs:
-//     in - input buffer,Compress memory from position to limit
-//     out - output buffer,The space from position to limit can be used to store data
-//     smaller - Whether the compressed data size is smaller than the data size before compression
-// Returns:
-//     Status::Error<BUFFER_OVERFLOW>() - Insufficient space left in output buffer
-//     Status::Error<COMPRESS_ERROR>() - Compression error
-typedef Status (*Compressor)(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
-
-// Define a decompression function to decompress the remaining memory in the input buffer
-// and save it to the remaining space in the output buffer
-// Inputs:
-//     in - input buffer,Decompress memory from position to limit
-//     out - output buffer,The space from position to limit can be used to store data
-// Returns:
-//     Status::Error<BUFFER_OVERFLOW>() - Insufficient space left in output buffer
-//     Status::Error<DECOMPRESS_ERROR>() - decompression error
-typedef Status (*Decompressor)(StorageByteBuffer* in, StorageByteBuffer* out);
-
-#ifdef DORIS_WITH_LZO
-Status lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
-Status lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out);
-#endif
-
-Status lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
-Status lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out);
-
-} // namespace doris
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index c85e1e5205..58abdeb62b 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -542,18 +542,6 @@ unsigned int crc32c_lut(char const* b, unsigned int off, unsigned int len, unsig
     return localCrc;
 }
 
-uint32_t olap_crc32(uint32_t crc32, const char* buf, size_t len) {
-#if defined(__i386) || defined(__x86_64__)
-    if (OLAP_LIKELY(CpuInfo::is_supported(CpuInfo::SSE4_2))) {
-        return baidu_crc32_qw(buf, crc32, len);
-    } else {
-        return crc32c_lut(buf, 0, len, crc32);
-    }
-#else
-    return crc32c_lut(buf, 0, len, crc32);
-#endif
-}
-
 Status gen_timestamp_string(string* out_string) {
     time_t now = time(nullptr);
     tm local_tm;
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index 9c1a20d767..e9a542eb8e 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -35,9 +35,6 @@
 #include <vector>
 
 #include "common/logging.h"
-#if defined(__i386) || defined(__x86_64__)
-#include "olap/bhp_lib.h"
-#endif
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 
@@ -126,26 +123,9 @@ Status olap_decompress(const char* src_buf, size_t src_len, char* dest_buf, size
 #define ADLER32_INIT adler32(0L, Z_NULL, 0)
 uint32_t olap_adler32(uint32_t adler, const char* buf, size_t len);
 
-// CRC32仅仅用在RowBlock的校验,性能优异
-#define CRC32_INIT 0xFFFFFFFF
-uint32_t olap_crc32(uint32_t crc32, const char* buf, size_t len);
-
 // 获取系统当前时间,并将时间转换为字符串
 Status gen_timestamp_string(std::string* out_string);
 
-enum ComparatorEnum {
-    COMPARATOR_LESS = 0,
-    COMPARATOR_LARGER = 1,
-};
-
-// 处理comparator functor处理过程中出现的错误
-class ComparatorException : public std::exception {
-public:
-    virtual const char* what() const throw() {
-        return "exception happens when doing binary search.";
-    }
-};
-
 // iterator offset,用于二分查找
 using iterator_offset_t = size_t;
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index bc79e36506..f78892fa0d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -24,7 +24,6 @@
 
 #include "common/logging.h"
 #include "common/utils.h"
-#include "exec/arrow/orc_reader.h"
 #include "exec/text_converter.hpp"
 #include "olap/iterators.h"
 #include "runtime/descriptors.h"
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 5779dbb372..2dcc476df9 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -19,7 +19,6 @@
 
 #include <arrow/array.h>
 #include <exec/arrow/arrow_reader.h>
-#include <exec/arrow/orc_reader.h>
 
 #include <map>
 #include <memory>
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 5f7340263f..70691e1c43 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -54,15 +54,12 @@ set(EXPRS_TEST_FILES
     exprs/json_function_test.cpp
     exprs/string_functions_test.cpp
     exprs/timestamp_functions_test.cpp
-    exprs/percentile_approx_test.cpp
-    exprs/percentile_test.cpp
     exprs/bitmap_function_test.cpp
     exprs/encryption_functions_test.cpp
     exprs/math_functions_test.cpp
     exprs/topn_function_test.cpp
     exprs/bloom_filter_predicate_test.cpp
     exprs/quantile_function_test.cpp
-    exprs/window_funnel_test.cpp
     exprs/hash_function_test.cpp
 )
 set(GEO_TEST_FILES
@@ -91,7 +88,6 @@ set(OLAP_TEST_FILES
     olap/tablet_schema_helper.cpp
     olap/delta_writer_test.cpp
     olap/delete_handler_test.cpp
-    olap/byte_buffer_test.cpp
     olap/lru_cache_test.cpp
     olap/bloom_filter_test.cpp
     olap/itoken_extractor_test.cpp
diff --git a/be/test/exprs/bitmap_function_test.cpp b/be/test/exprs/bitmap_function_test.cpp
index aa3150995d..84aa1e3e8d 100644
--- a/be/test/exprs/bitmap_function_test.cpp
+++ b/be/test/exprs/bitmap_function_test.cpp
@@ -24,7 +24,6 @@
 #include <string>
 #include <vector>
 
-#include "exprs/aggregate_functions.h"
 #include "exprs/anyval_util.h"
 #include "testutil/function_utils.h"
 #include "util/bitmap_intersect.h"
diff --git a/be/test/exprs/percentile_approx_test.cpp b/be/test/exprs/percentile_approx_test.cpp
deleted file mode 100644
index b7f4e51d01..0000000000
--- a/be/test/exprs/percentile_approx_test.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "exprs/aggregate_functions.h"
-#include "testutil/function_utils.h"
-
-namespace doris {
-
-class PercentileApproxTest : public testing::Test {
-public:
-    PercentileApproxTest() {}
-};
-
-TEST_F(PercentileApproxTest, testSample) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.9);
-
-    StringVal stringVal1;
-    DoubleVal int1(1);
-    AggregateFunctions::percentile_approx_init(context, &stringVal1);
-    AggregateFunctions::percentile_approx_update(context, int1, doubleQ, &stringVal1);
-    DoubleVal int2(2);
-    AggregateFunctions::percentile_approx_update(context, int2, doubleQ, &stringVal1);
-
-    StringVal s = AggregateFunctions::percentile_approx_serialize(context, stringVal1);
-
-    StringVal stringVal2;
-    AggregateFunctions::percentile_approx_init(context, &stringVal2);
-    AggregateFunctions::percentile_approx_merge(context, s, &stringVal2);
-    DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2);
-    EXPECT_EQ(v.val, 2);
-    delete futil;
-}
-
-TEST_F(PercentileApproxTest, testNoMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.9);
-
-    StringVal stringVal1;
-    DoubleVal val(1);
-    AggregateFunctions::percentile_approx_init(context, &stringVal1);
-    AggregateFunctions::percentile_approx_update(context, val, doubleQ, &stringVal1);
-    DoubleVal val2(2);
-    AggregateFunctions::percentile_approx_update(context, val2, doubleQ, &stringVal1);
-
-    DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal1);
-    EXPECT_EQ(v.val, 2);
-    delete futil;
-}
-
-TEST_F(PercentileApproxTest, testSerialize) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.999);
-    StringVal stringVal;
-    AggregateFunctions::percentile_approx_init(context, &stringVal);
-
-    for (int i = 1; i <= 100000; i++) {
-        DoubleVal val(i);
-        AggregateFunctions::percentile_approx_update(context, val, doubleQ, &stringVal);
-    }
-    StringVal serialized = AggregateFunctions::percentile_approx_serialize(context, stringVal);
-
-    // mock serialize
-    StringVal stringVal2;
-    AggregateFunctions::percentile_approx_init(context, &stringVal2);
-    AggregateFunctions::percentile_approx_merge(context, serialized, &stringVal2);
-    DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2);
-    EXPECT_DOUBLE_EQ(v.val, 99900.5);
-
-    // merge init percentile stringVal3 should not change the correct result
-    AggregateFunctions::percentile_approx_init(context, &stringVal);
-
-    for (int i = 1; i <= 100000; i++) {
-        DoubleVal val(i);
-        AggregateFunctions::percentile_approx_update(context, val, doubleQ, &stringVal);
-    }
-    serialized = AggregateFunctions::percentile_approx_serialize(context, stringVal);
-
-    StringVal stringVal3;
-    AggregateFunctions::percentile_approx_init(context, &stringVal2);
-    AggregateFunctions::percentile_approx_init(context, &stringVal3);
-    StringVal serialized2 = AggregateFunctions::percentile_approx_serialize(context, stringVal3);
-
-    AggregateFunctions::percentile_approx_merge(context, serialized, &stringVal2);
-    AggregateFunctions::percentile_approx_merge(context, serialized2, &stringVal2);
-    v = AggregateFunctions::percentile_approx_finalize(context, stringVal2);
-    EXPECT_DOUBLE_EQ(v.val, 99900.5);
-
-    delete futil;
-}
-
-TEST_F(PercentileApproxTest, testNullVale) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.999);
-    StringVal stringVal;
-    AggregateFunctions::percentile_approx_init(context, &stringVal);
-
-    for (int i = 1; i <= 100000; i++) {
-        if (i % 3 == 0) {
-            AggregateFunctions::percentile_approx_update(context, DoubleVal::null(), doubleQ,
-                                                         &stringVal);
-        } else {
-            AggregateFunctions::percentile_approx_update(context, DoubleVal(i), doubleQ,
-                                                         &stringVal);
-        }
-    }
-    StringVal serialized = AggregateFunctions::percentile_approx_serialize(context, stringVal);
-
-    // mock serialize
-    StringVal stringVal2;
-    AggregateFunctions::percentile_approx_init(context, &stringVal2);
-    AggregateFunctions::percentile_approx_merge(context, serialized, &stringVal2);
-    DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2);
-    EXPECT_FLOAT_EQ(v.val, 99900.665999999997);
-    delete futil;
-}
-
-} // namespace doris
diff --git a/be/test/exprs/percentile_test.cpp b/be/test/exprs/percentile_test.cpp
deleted file mode 100644
index bae6774365..0000000000
--- a/be/test/exprs/percentile_test.cpp
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "exprs/aggregate_functions.h"
-#include "testutil/function_utils.h"
-
-namespace doris {
-
-class PercentileTest : public testing::Test {
-public:
-    PercentileTest() {}
-};
-
-TEST_F(PercentileTest, testSample) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.9);
-
-    StringVal stringVal1;
-    BigIntVal int1(1);
-    AggregateFunctions::percentile_init(context, &stringVal1);
-    AggregateFunctions::percentile_update(context, int1, doubleQ, &stringVal1);
-    BigIntVal int2(2);
-    AggregateFunctions::percentile_update(context, int2, doubleQ, &stringVal1);
-
-    StringVal s = AggregateFunctions::percentile_serialize(context, stringVal1);
-
-    StringVal stringVal2;
-    AggregateFunctions::percentile_init(context, &stringVal2);
-    AggregateFunctions::percentile_merge(context, s, &stringVal2);
-    DoubleVal v = AggregateFunctions::percentile_finalize(context, stringVal2);
-    EXPECT_EQ(v.val, 1.9);
-    delete futil;
-}
-
-TEST_F(PercentileTest, testNoMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.9);
-
-    StringVal stringVal1;
-    BigIntVal val(1);
-    AggregateFunctions::percentile_init(context, &stringVal1);
-    AggregateFunctions::percentile_update(context, val, doubleQ, &stringVal1);
-    BigIntVal val2(2);
-    AggregateFunctions::percentile_update(context, val2, doubleQ, &stringVal1);
-
-    DoubleVal v = AggregateFunctions::percentile_finalize(context, stringVal1);
-    EXPECT_EQ(v.val, 1.9);
-    delete futil;
-}
-
-TEST_F(PercentileTest, testSerialize) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    DoubleVal doubleQ(0.999);
-    StringVal stringVal;
-    AggregateFunctions::percentile_init(context, &stringVal);
-
-    for (int i = 1; i <= 100000; i++) {
-        BigIntVal val(i);
-        AggregateFunctions::percentile_update(context, val, doubleQ, &stringVal);
-    }
-    StringVal serialized = AggregateFunctions::percentile_serialize(context, stringVal);
-
-    // mock serialize
-    StringVal stringVal2;
-    AggregateFunctions::percentile_init(context, &stringVal2);
-    AggregateFunctions::percentile_merge(context, serialized, &stringVal2);
-    DoubleVal v = AggregateFunctions::percentile_finalize(context, stringVal2);
-    EXPECT_DOUBLE_EQ(v.val, 99900.001);
-
-    // merge init percentile stringVal3 should not change the correct result
-    AggregateFunctions::percentile_init(context, &stringVal);
-
-    for (int i = 1; i <= 100000; i++) {
-        BigIntVal val(i);
-        AggregateFunctions::percentile_update(context, val, doubleQ, &stringVal);
-    }
-    serialized = AggregateFunctions::percentile_serialize(context, stringVal);
-
-    StringVal stringVal3;
-    AggregateFunctions::percentile_init(context, &stringVal2);
-    AggregateFunctions::percentile_init(context, &stringVal3);
-    StringVal serialized2 = AggregateFunctions::percentile_serialize(context, stringVal3);
-
-    AggregateFunctions::percentile_merge(context, serialized, &stringVal2);
-    AggregateFunctions::percentile_merge(context, serialized2, &stringVal2);
-    v = AggregateFunctions::percentile_finalize(context, stringVal2);
-    EXPECT_DOUBLE_EQ(v.val, 99900.001);
-
-    delete futil;
-}
-
-} // namespace doris
diff --git a/be/test/exprs/window_funnel_test.cpp b/be/test/exprs/window_funnel_test.cpp
deleted file mode 100644
index 04449438dd..0000000000
--- a/be/test/exprs/window_funnel_test.cpp
+++ /dev/null
@@ -1,425 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "common/logging.h"
-#include "exprs/aggregate_functions.h"
-#include "runtime/datetime_value.h"
-#include "testutil/function_utils.h"
-
-namespace doris {
-
-class WindowFunnelTest : public testing::Test {
-public:
-    WindowFunnelTest() {}
-};
-
-TEST_F(WindowFunnelTest, testMax4SortedNoMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {false, false, true, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 4, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, true};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, i < 0 ? 1 : (i < NUM_CONDS ? i + 1 : NUM_CONDS));
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testMax4SortedMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {false, false, true, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 4, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, true};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        StringVal s = AggregateFunctions::window_funnel_serialize(context, stringVal1);
-
-        StringVal stringVal2;
-        AggregateFunctions::window_funnel_init(context, &stringVal2);
-        AggregateFunctions::window_funnel_merge(context, s, &stringVal2);
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal2);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, i < 0 ? 1 : (i < NUM_CONDS ? i + 1 : NUM_CONDS));
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testMax4ReverseSortedNoMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {false, false, true, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 0, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, true};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, 1);
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testMax4ReverseSortedMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {false, false, true, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 0, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, true};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        StringVal s = AggregateFunctions::window_funnel_serialize(context, stringVal1);
-
-        StringVal stringVal2;
-        AggregateFunctions::window_funnel_init(context, &stringVal2);
-        AggregateFunctions::window_funnel_merge(context, s, &stringVal2);
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal2);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, 1);
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testMax4DuplicateSortedNoMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 0, 0);
-        time_value.to_datetime_val(&timestamp);
-
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, i < 0 ? 1 : (i < 2 ? i + 1 : 2));
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testMax4DuplicateSortedMerge) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    const int NUM_CONDS = 4;
-    for (int i = -1; i < NUM_CONDS + 4; i++) {
-        StringVal stringVal1;
-        BigIntVal window(i);
-        StringVal mode("default");
-        std::vector<doris_udf::AnyVal*> constant_args;
-        constant_args.emplace_back(&window);
-        constant_args.emplace_back(&mode);
-        context->impl()->set_constant_args(std::move(constant_args));
-
-        AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-        DateTimeVal timestamp;
-        DateTimeValue time_value;
-        time_value.set_time(2020, 2, 28, 0, 0, 0, 0);
-        time_value.to_datetime_val(&timestamp);
-
-        BooleanVal conds[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS, conds,
-                                                 &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 1, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds1[NUM_CONDS] = {false, true, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds1, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 2, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds2[NUM_CONDS] = {true, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds2, &stringVal1);
-
-        time_value.set_time(2020, 2, 28, 0, 0, 3, 0);
-        time_value.to_datetime_val(&timestamp);
-        BooleanVal conds3[NUM_CONDS] = {false, false, false, false};
-        AggregateFunctions::window_funnel_update(context, window, mode, timestamp, NUM_CONDS,
-                                                 conds3, &stringVal1);
-
-        StringVal s = AggregateFunctions::window_funnel_serialize(context, stringVal1);
-
-        StringVal stringVal2;
-        AggregateFunctions::window_funnel_init(context, &stringVal2);
-        AggregateFunctions::window_funnel_merge(context, s, &stringVal2);
-        IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal2);
-        LOG(INFO) << "event num: " << NUM_CONDS << " window: " << window.val;
-        EXPECT_EQ(v.val, i < 0 ? 1 : (i < 2 ? i + 1 : 2));
-    }
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testNoMatchedEvent) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    StringVal stringVal1;
-    BigIntVal window(0);
-    StringVal mode("default");
-    std::vector<doris_udf::AnyVal*> constant_args;
-    constant_args.emplace_back(&window);
-    constant_args.emplace_back(&mode);
-    context->impl()->set_constant_args(std::move(constant_args));
-
-    AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-    DateTimeVal timestamp;
-    DateTimeValue time_value;
-    time_value.set_time(2020, 2, 28, 0, 0, 0, 0);
-    time_value.to_datetime_val(&timestamp);
-
-    BooleanVal conds[4] = {false, false, false, false};
-    AggregateFunctions::window_funnel_update(context, window, mode, timestamp, 4, conds,
-                                             &stringVal1);
-
-    IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-    EXPECT_EQ(v.val, 0);
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testNoEvent) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    StringVal stringVal1;
-    BigIntVal window(0);
-    StringVal mode("default");
-    std::vector<doris_udf::AnyVal*> constant_args;
-    constant_args.emplace_back(&window);
-    constant_args.emplace_back(&mode);
-    context->impl()->set_constant_args(std::move(constant_args));
-
-    AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-    IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-    EXPECT_EQ(v.val, 0);
-
-    StringVal stringVal2;
-    AggregateFunctions::window_funnel_init(context, &stringVal2);
-
-    v = AggregateFunctions::window_funnel_finalize(context, stringVal2);
-    EXPECT_EQ(v.val, 0);
-
-    delete futil;
-}
-
-TEST_F(WindowFunnelTest, testInputNull) {
-    FunctionUtils* futil = new FunctionUtils();
-    doris_udf::FunctionContext* context = futil->get_fn_ctx();
-
-    BigIntVal window(0);
-    StringVal mode("default");
-    std::vector<doris_udf::AnyVal*> constant_args;
-    constant_args.emplace_back(&window);
-    constant_args.emplace_back(&mode);
-    context->impl()->set_constant_args(std::move(constant_args));
-
-    StringVal stringVal1;
-    AggregateFunctions::window_funnel_init(context, &stringVal1);
-
-    DateTimeVal timestamp = DateTimeVal::null();
-    BooleanVal conds[4] = {false, false, false, false};
-    AggregateFunctions::window_funnel_update(context, window, mode, timestamp, 4, conds,
-                                             &stringVal1);
-
-    IntVal v = AggregateFunctions::window_funnel_finalize(context, stringVal1);
-    EXPECT_EQ(v.val, 0);
-
-    delete futil;
-}
-
-} // namespace doris
diff --git a/be/test/olap/byte_buffer_test.cpp b/be/test/olap/byte_buffer_test.cpp
deleted file mode 100644
index b5699cadc7..0000000000
--- a/be/test/olap/byte_buffer_test.cpp
+++ /dev/null
@@ -1,190 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/byte_buffer.h"
-
-#include <gtest/gtest.h>
-#include <sys/mman.h>
-
-#include <filesystem>
-
-#include "common/configbase.h"
-#include "olap/file_helper.h"
-
-namespace doris {
-using namespace ErrorCode;
-
-class TestByteBuffer : public testing::Test {
-public:
-    virtual ~TestByteBuffer() {}
-    virtual void SetUp() {}
-    virtual void TearDown() {
-        if (std::filesystem::exists(".test_byte_buffer")) {
-            EXPECT_TRUE(std::filesystem::remove_all(".test_byte_buffer"));
-        }
-    }
-};
-
-// 测试基本的读写功能
-TEST_F(TestByteBuffer, TestReadWrite) {
-    StorageByteBuffer* buf1 = nullptr;
-
-    buf1 = StorageByteBuffer::create(100);
-    EXPECT_TRUE(buf1 != nullptr);
-
-    char in[10] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'};
-    for (int i = 0; i < 5; i++) {
-        EXPECT_EQ(Status::OK(), buf1->put(in, sizeof(in)));
-        EXPECT_EQ(100u - (i + 1) * sizeof(in), buf1->remaining());
-        EXPECT_EQ((i + 1) * sizeof(in), buf1->position());
-    }
-
-    // 参数错误的指定写
-    EXPECT_EQ(Status::Error<OUT_OF_BOUND>(), buf1->put(in, sizeof(in), 5, 10));
-
-    for (int i = 0; i < 50; i++) {
-        EXPECT_EQ(Status::OK(), buf1->put(i));
-        EXPECT_EQ(50u - (i + 1), buf1->remaining());
-        EXPECT_EQ(50u + i + 1, buf1->position());
-    }
-
-    // 再写就失败了
-    EXPECT_EQ(Status::Error<BUFFER_OVERFLOW>(), buf1->put(0));
-    EXPECT_EQ(Status::Error<BUFFER_OVERFLOW>(), buf1->put(in, sizeof(in)));
-
-    // 转为读模式
-    buf1->flip();
-
-    for (int i = 0; i < 5; i++) {
-        for (int j = 0; j < 10; j++) {
-            char byte;
-            EXPECT_EQ(Status::OK(), buf1->get(&byte));
-            EXPECT_EQ(100u - (i * 10 + j + 1), buf1->remaining());
-            EXPECT_EQ(i * 10 + j + 1, buf1->position());
-            EXPECT_EQ('a' + j, byte);
-        }
-    }
-    char buf[50];
-    EXPECT_EQ(Status::Error<OUT_OF_BOUND>(), buf1->get(buf, 100));
-    EXPECT_EQ(Status::Error<BUFFER_OVERFLOW>(), buf1->get(buf, 10, 50));
-    EXPECT_EQ(Status::OK(), buf1->get(buf, sizeof(buf)));
-    EXPECT_EQ(0u, buf1->remaining());
-    EXPECT_EQ(100u, buf1->position());
-
-    for (int i = 0; i < 50; i++) {
-        EXPECT_EQ(i, buf[i]);
-    }
-    char byte;
-    EXPECT_EQ(Status::Error<OUT_OF_BOUND>(), buf1->get(&byte));
-    EXPECT_EQ(Status::Error<OUT_OF_BOUND>(), buf1->get(&byte, 1));
-
-    EXPECT_EQ(Status::OK(), buf1->put(10, 'x'));
-    EXPECT_EQ(Status::OK(), buf1->get(10, &byte));
-    EXPECT_EQ('x', byte);
-
-    EXPECT_EQ(Status::OK(), buf1->set_limit(11));
-    EXPECT_EQ(11u, buf1->limit());
-    EXPECT_EQ(11u, buf1->position());
-    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(), buf1->set_limit(101));
-    EXPECT_EQ(Status::OK(), buf1->set_position(10));
-    EXPECT_EQ(Status::OK(), buf1->get(&byte));
-    EXPECT_EQ('x', byte);
-    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(), buf1->set_position(12));
-
-    SAFE_DELETE(buf1);
-}
-
-// 测试ByteBuffer对内存的引用, 尤其是智能指针的引用传递
-// 使用valgrind进行内存泄露检查
-TEST_F(TestByteBuffer, TestRef) {
-    StorageByteBuffer* buf1 = nullptr;
-
-    buf1 = StorageByteBuffer::create(1000);
-    EXPECT_TRUE(buf1 != nullptr);
-
-    for (int i = 0; i < 256; i++) {
-        EXPECT_EQ(Status::OK(), buf1->put(i));
-    }
-    StorageByteBuffer buf2 = *buf1;
-    EXPECT_EQ(buf2.array(), buf1->array());
-    StorageByteBuffer buf4(*buf1);
-    EXPECT_EQ(buf2.array(), buf1->array());
-
-    StorageByteBuffer* buf3 = nullptr;
-    buf3 = StorageByteBuffer::reference_buffer(buf1, 10, 90);
-
-    EXPECT_EQ(90u, buf3->capacity());
-    EXPECT_EQ(90u, buf3->limit());
-    EXPECT_EQ(0u, buf3->position());
-
-    for (int i = 0; i < 90; i++) {
-        char byte;
-        EXPECT_EQ(Status::OK(), buf3->get(&byte));
-        EXPECT_EQ(i + 10, byte);
-    }
-
-    EXPECT_EQ(4u, buf1->_buf.use_count());
-
-    SAFE_DELETE(buf1);
-    SAFE_DELETE(buf3);
-    EXPECT_EQ(2u, buf2._buf.use_count());
-}
-
-TEST_F(TestByteBuffer, TestMmap) {
-    FileHandler file_handle;
-    std::string file_name = ".test_byte_buffer";
-    Status res = file_handle.open_with_mode(file_name, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
-    EXPECT_EQ(Status::OK(), res);
-
-    char buf[100];
-    for (int i = 0; i < 100; i++) {
-        buf[i] = i;
-    }
-    EXPECT_EQ(Status::OK(), file_handle.write(buf, 100));
-    file_handle.close();
-
-    res = file_handle.open(file_name, O_RDWR);
-    EXPECT_EQ(Status::OK(), res);
-    StorageByteBuffer* buf1 = StorageByteBuffer::mmap(nullptr, 80, PROT_READ | PROT_WRITE,
-                                                      MAP_SHARED, file_handle.fd(), 0);
-    // mmap完成后就可以关闭原fd
-    file_handle.close();
-    EXPECT_TRUE(buf1 != nullptr);
-
-    for (int i = 0; i < 80; i++) {
-        char byte;
-        EXPECT_EQ(Status::OK(), buf1->get(&byte));
-        EXPECT_EQ(i, byte);
-    }
-
-    // 测试通过mmap写入数据
-    buf1->set_position(0);
-    for (int i = 0; i < 10; i++) {
-        EXPECT_EQ(Status::OK(), buf1->put('x'));
-    }
-
-    SAFE_DELETE(buf1);
-
-    res = file_handle.open(file_name, O_RDONLY);
-    EXPECT_EQ(Status::OK(), res);
-    EXPECT_EQ(Status::OK(), file_handle.pread(buf, 10, SEEK_SET));
-    for (int i = 0; i < 10; i++) {
-        EXPECT_EQ('x', buf[i]);
-    }
-}
-
-} // namespace doris
diff --git a/be/test/util/decompress_test.cpp b/be/test/util/decompress_test.cpp
index 0e06a76ad8..2e52825a23 100644
--- a/be/test/util/decompress_test.cpp
+++ b/be/test/util/decompress_test.cpp
@@ -24,7 +24,6 @@
 #include <iostream>
 
 #include "gen_cpp/Descriptors_types.h"
-#include "util/compress.h"
 
 using namespace std;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org