You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/07 02:55:20 UTC

[incubator-doris] branch master updated: [Enhancement] [Vectorized] Refactor and optimize BinaryOperation (#9087)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98bfeaf560 [Enhancement] [Vectorized] Refactor and optimize BinaryOperation (#9087)
98bfeaf560 is described below

commit 98bfeaf560f7ac04f2324499e4dc7cc3bbf0a437
Author: Pxl <95...@qq.com>
AuthorDate: Sat May 7 10:55:15 2022 +0800

    [Enhancement] [Vectorized] Refactor and optimize BinaryOperation (#9087)
---
 be/src/vec/data_types/data_type_decimal.h          |   6 +-
 be/src/vec/data_types/number_traits.h              |  87 +-
 be/src/vec/functions/divide.cpp                    |  34 +-
 be/src/vec/functions/function.h                    |   9 -
 be/src/vec/functions/function_binary_arithmetic.h  | 881 ++++++++++++---------
 .../function_binary_arithmetic_to_null_type.h      | 247 ------
 be/src/vec/functions/function_bit.cpp              |   8 +-
 be/src/vec/functions/function_cast.h               |   1 -
 be/src/vec/functions/int_div.cpp                   | 118 +--
 be/src/vec/functions/int_div.h                     |  36 +-
 be/src/vec/functions/math.cpp                      |  39 +-
 be/src/vec/functions/minus.cpp                     |   2 +-
 be/src/vec/functions/modulo.cpp                    | 180 ++---
 be/src/vec/functions/multiply.cpp                  |   2 +-
 be/src/vec/functions/plus.cpp                      |   2 +-
 15 files changed, 676 insertions(+), 976 deletions(-)

diff --git a/be/src/vec/data_types/data_type_decimal.h b/be/src/vec/data_types/data_type_decimal.h
index d701ecee71..0613653324 100644
--- a/be/src/vec/data_types/data_type_decimal.h
+++ b/be/src/vec/data_types/data_type_decimal.h
@@ -106,10 +106,12 @@ public:
         }
 
         // Now, Doris only support precision:27, scale: 9
-        DCHECK(precision_ == 27);
-        DCHECK(scale_ == 9);
+        DCHECK(precision == 27);
+        DCHECK(scale == 9);
     }
 
+    DataTypeDecimal(const DataTypeDecimal& rhs) : precision(rhs.precision), scale(rhs.scale) {}
+
     const char* get_family_name() const override { return "Decimal"; }
     std::string do_get_name() const override;
     TypeIndex get_type_id() const override { return TypeId<T>::value; }
diff --git a/be/src/vec/data_types/number_traits.h b/be/src/vec/data_types/number_traits.h
index 70830bfcd1..8b87e55d93 100644
--- a/be/src/vec/data_types/number_traits.h
+++ b/be/src/vec/data_types/number_traits.h
@@ -22,6 +22,8 @@
 
 #include <type_traits>
 
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_vector.h"
 #include "vec/common/uint128.h"
 #include "vec/core/types.h"
 
@@ -155,7 +157,8 @@ struct ResultOfSubtraction {
     */
 template <typename A, typename B>
 struct ResultOfFloatingPointDivision {
-    using Type = Float64;
+    using Type = std::conditional_t<IsDecimalNumber<A>, A,
+                                    std::conditional_t<IsDecimalNumber<B>, B, Float64>>;
 };
 
 /** For integer division, we get a number with the same number of bits as in divisible.
@@ -171,13 +174,8 @@ struct ResultOfIntegerDivision {
 template <typename A, typename B>
 struct ResultOfModulo {
     using Type = typename Construct<std::is_signed_v<A> || std::is_signed_v<B>,
-                                    std::is_floating_point_v<A>, max(sizeof(A), sizeof(B))>::Type;
-};
-
-template <typename A>
-struct ResultOfNegate {
-    using Type = typename Construct<true, std::is_floating_point_v<A>,
-                                    std::is_signed_v<A> ? sizeof(A) : next_size(sizeof(A))>::Type;
+                                    std::is_floating_point_v<A> || std::is_floating_point_v<B>,
+                                    max(sizeof(A), sizeof(B))>::Type;
 };
 
 template <typename A>
@@ -200,76 +198,15 @@ struct ResultOfBitNot {
     using Type = typename Construct<std::is_signed_v<A>, false, sizeof(A)>::Type;
 };
 
-/** Type casting for `if` function:
-  * UInt<x>,  UInt<y>   ->  UInt<max(x,y)>
-  * Int<x>,   Int<y>    ->   Int<max(x,y)>
-  * Float<x>, Float<y>  -> Float<max(x, y)>
-  * UInt<x>,  Int<y>    ->   Int<max(x*2, y)>
-  * Float<x>, [U]Int<y> -> Float<max(x, y*2)>
-  * Decimal<x>, Decimal<y> -> Decimal<max(x,y)>
-  * UUID, UUID          -> UUID
-  * UInt64 ,  Int<x>    -> Error
-  * Float<x>, [U]Int64  -> Error
-  */
 template <typename A, typename B>
-struct ResultOfIf {
-    static constexpr bool has_float = std::is_floating_point_v<A> || std::is_floating_point_v<B>;
-    static constexpr bool has_integer = std::is_integral_v<A> || std::is_integral_v<B>;
-    static constexpr bool has_signed = std::is_signed_v<A> || std::is_signed_v<B>;
-    static constexpr bool has_unsigned = !std::is_signed_v<A> || !std::is_signed_v<B>;
-
-    static constexpr size_t max_size_of_unsigned_integer =
-            max(std::is_signed_v<A> ? 0 : sizeof(A), std::is_signed_v<B> ? 0 : sizeof(B));
-    static constexpr size_t max_size_of_signed_integer =
-            max(std::is_signed_v<A> ? sizeof(A) : 0, std::is_signed_v<B> ? sizeof(B) : 0);
-    static constexpr size_t max_size_of_integer =
-            max(std::is_integral_v<A> ? sizeof(A) : 0, std::is_integral_v<B> ? sizeof(B) : 0);
-    static constexpr size_t max_size_of_float = max(std::is_floating_point_v<A> ? sizeof(A) : 0,
-                                                    std::is_floating_point_v<B> ? sizeof(B) : 0);
-
-    using ConstructedType =
-            typename Construct<has_signed, has_float,
-                               ((has_float && has_integer &&
-                                 max_size_of_integer >= max_size_of_float) ||
-                                (has_signed && has_unsigned &&
-                                 max_size_of_unsigned_integer >= max_size_of_signed_integer))
-                                       ? max(sizeof(A), sizeof(B)) * 2
-                                       : max(sizeof(A), sizeof(B))>::Type;
-
-    using ConstructedWithUUID =
-            std::conditional_t<std::is_same_v<A, UInt128> && std::is_same_v<B, UInt128>, A,
-                               ConstructedType>;
-
-    using Type = std::conditional_t<
-            !IsDecimalNumber<A> && !IsDecimalNumber<B>, ConstructedWithUUID,
-            std::conditional_t<IsDecimalNumber<A> && IsDecimalNumber<B>,
-                               std::conditional_t<(sizeof(A) > sizeof(B)), A, B>, Error>>;
+struct BinaryOperatorTraits {
+    using ColumnVectorA = std::conditional_t<IsDecimalNumber<A>, ColumnDecimal<A>, ColumnVector<A>>;
+    using ColumnVectorB = std::conditional_t<IsDecimalNumber<B>, ColumnDecimal<B>, ColumnVector<B>>;
+    using ArrayA = typename ColumnVectorA::Container;
+    using ArrayB = typename ColumnVectorB::Container;
+    using ArrayNull = PaddedPODArray<UInt8>;
 };
 
-/** Before applying operator `%` and bitwise operations, operands are casted to whole numbers. */
-template <typename A>
-struct ToInteger {
-    using Type = typename Construct<std::is_signed_v<A>, false,
-                                    std::is_floating_point_v<A> ? 8 : sizeof(A)>::Type;
-};
-
-// CLICKHOUSE-29. The same depth, different signs
-// NOTE: This case is applied for 64-bit integers only (for backward compatibility), but could be used for any-bit integers
-template <typename A, typename B>
-constexpr bool LeastGreatestSpecialCase = std::is_integral_v<A>&& std::is_integral_v<B> &&
-                                          (8 == sizeof(A) && sizeof(A) == sizeof(B)) &&
-                                          (std::is_signed_v<A> ^ std::is_signed_v<B>);
-
-template <typename A, typename B>
-using ResultOfLeast = std::conditional_t<LeastGreatestSpecialCase<A, B>,
-                                         typename Construct<true, false, sizeof(A)>::Type,
-                                         typename ResultOfIf<A, B>::Type>;
-
-template <typename A, typename B>
-using ResultOfGreatest = std::conditional_t<LeastGreatestSpecialCase<A, B>,
-                                            typename Construct<false, false, sizeof(A)>::Type,
-                                            typename ResultOfIf<A, B>::Type>;
-
 } // namespace NumberTraits
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/divide.cpp b/be/src/vec/functions/divide.cpp
index b6d3a2b35e..a602cb8b33 100644
--- a/be/src/vec/functions/divide.cpp
+++ b/be/src/vec/functions/divide.cpp
@@ -18,7 +18,7 @@
 // https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/divide.cpp
 // and modified by Doris
 
-#include "vec/functions/function_binary_arithmetic_to_null_type.h"
+#include "vec/functions/function_binary_arithmetic.h"
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
@@ -28,26 +28,42 @@ static const DecimalV2Value one(1, 0);
 template <typename A, typename B>
 struct DivideFloatingImpl {
     using ResultType = typename NumberTraits::ResultOfFloatingPointDivision<A, B>::Type;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+
     static const constexpr bool allow_decimal = true;
 
+    template <typename Result = ResultType>
+    static void apply(const typename Traits::ArrayA& a, B b,
+                      typename ColumnVector<Result>::Container& c,
+                      typename Traits::ArrayNull& null_map) {
+        size_t size = c.size();
+        UInt8 is_null = b == 0;
+        memset(null_map.data(), is_null, size);
+
+        if (!is_null) {
+            for (size_t i = 0; i < size; i++) {
+                c[i] = (double)a[i] / (double)b;
+            }
+        }
+    }
+
     template <typename Result = DecimalV2Value>
-    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, NullMap& null_map,
-                                       size_t index) {
-        null_map[index] = b.is_zero();
-        return a / (b.is_zero() ? one : b);
+    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, UInt8& is_null) {
+        is_null = b.is_zero();
+        return a / (is_null ? one : b);
     }
 
     template <typename Result = ResultType>
-    static inline Result apply(A a, B b, NullMap& null_map, size_t index) {
-        null_map[index] = b == 0;
-        return static_cast<Result>(a) / (b + (b == 0));
+    static inline Result apply(A a, B b, UInt8& is_null) {
+        is_null = b == 0;
+        return static_cast<Result>(a) / (b + is_null);
     }
 };
 
 struct NameDivide {
     static constexpr auto name = "divide";
 };
-using FunctionDivide = FunctionBinaryArithmeticToNullType<DivideFloatingImpl, NameDivide>;
+using FunctionDivide = FunctionBinaryArithmetic<DivideFloatingImpl, NameDivide, true>;
 
 void register_function_divide(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionDivide>();
diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h
index 0ea494a06b..802ef6fdf4 100644
--- a/be/src/vec/functions/function.h
+++ b/be/src/vec/functions/function.h
@@ -95,11 +95,6 @@ protected:
       */
     virtual ColumnNumbers get_arguments_that_are_always_constant() const { return {}; }
 
-    /** True if function can be called on default arguments (include Nullable's) and won't throw.
-      * Counterexample: modulo(0, 0)
-      */
-    virtual bool can_be_executed_on_default_arguments() const { return true; }
-
 private:
     Status default_implementation_for_nulls(FunctionContext* context, Block& block,
                                             const ColumnNumbers& args, size_t result,
@@ -386,7 +381,6 @@ public:
     bool use_default_implementation_for_constants() const override { return false; }
     bool use_default_implementation_for_low_cardinality_columns() const override { return true; }
     ColumnNumbers get_arguments_that_are_always_constant() const override { return {}; }
-    bool can_be_executed_on_default_arguments() const override { return true; }
     bool can_be_executed_on_low_cardinality_dictionary() const override {
         return is_deterministic_in_scope_of_query();
     }
@@ -460,9 +454,6 @@ protected:
     ColumnNumbers get_arguments_that_are_always_constant() const final {
         return function->get_arguments_that_are_always_constant();
     }
-    bool can_be_executed_on_default_arguments() const override {
-        return function->can_be_executed_on_default_arguments();
-    }
 
 private:
     std::shared_ptr<IFunction> function;
diff --git a/be/src/vec/functions/function_binary_arithmetic.h b/be/src/vec/functions/function_binary_arithmetic.h
index 744b55a09f..74b7df4260 100644
--- a/be/src/vec/functions/function_binary_arithmetic.h
+++ b/be/src/vec/functions/function_binary_arithmetic.h
@@ -20,167 +20,210 @@
 
 #pragma once
 
-#include "common/logging.h"
+#include "runtime/tuple.h"
 #include "vec/columns/column_const.h"
 #include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/typeid_cast.h"
-#include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_decimal.h"
-#include "vec/data_types/data_type_number.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/number_traits.h"
 #include "vec/functions/cast_type_to_either.h"
 #include "vec/functions/function.h"
-#include "vec/functions/function_helpers.h"
-#include "vec/functions/int_div.h"
-#include "vec/utils/util.hpp"
 
 namespace doris::vectorized {
 
-/** Arithmetic operations: +, -, *,
-  * Bitwise operations: |, &, ^, ~.
-  * Etc.
-  */
+// Arithmetic operations: +, -, *, |, &, ^, ~
+// need implement apply(a, b)
 
-template <typename A, typename B, typename Op, typename ResultType_ = typename Op::ResultType>
+// Arithmetic operations (to null type): /, %, intDiv (integer division), log
+// need implement apply(a, b, is_null), apply(array_a, b, null_map)
+// apply(array_a, b, null_map) is only used on vector_constant
+
+// TODO: vector_constant optimization not work on decimal type now
+
+template <typename, typename>
+struct PlusImpl;
+template <typename, typename>
+struct MinusImpl;
+template <typename, typename>
+struct MultiplyImpl;
+template <typename, typename>
+struct DivideFloatingImpl;
+template <typename, typename>
+struct DivideIntegralImpl;
+template <typename, typename>
+struct ModuloImpl;
+
+template <template <typename, typename> typename Operation>
+struct OperationTraits {
+    using T = UInt8;
+    static constexpr bool is_plus_minus = std::is_same_v<Operation<T, T>, PlusImpl<T, T>> ||
+                                          std::is_same_v<Operation<T, T>, MinusImpl<T, T>>;
+    static constexpr bool is_multiply = std::is_same_v<Operation<T, T>, MultiplyImpl<T, T>>;
+    static constexpr bool is_division = std::is_same_v<Operation<T, T>, DivideFloatingImpl<T, T>> ||
+                                        std::is_same_v<Operation<T, T>, DivideIntegralImpl<T, T>>;
+    static constexpr bool allow_decimal =
+            std::is_same_v<Operation<T, T>, PlusImpl<T, T>> ||
+            std::is_same_v<Operation<T, T>, MinusImpl<T, T>> ||
+            std::is_same_v<Operation<T, T>, MultiplyImpl<T, T>> ||
+            std::is_same_v<Operation<T, T>, ModuloImpl<T, T>> ||
+            std::is_same_v<Operation<T, T>, DivideFloatingImpl<T, T>> ||
+            std::is_same_v<Operation<T, T>, DivideIntegralImpl<T, T>>;
+    static constexpr bool can_overflow = is_plus_minus || is_multiply;
+};
+
+template <typename A, typename B, typename Op, typename ResultType = typename Op::ResultType>
 struct BinaryOperationImplBase {
-    using ResultType = ResultType_;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+    using ColumnVectorResult =
+            std::conditional_t<IsDecimalNumber<ResultType>, ColumnDecimal<ResultType>,
+                               ColumnVector<ResultType>>;
 
-    static void NO_INLINE vector_vector(const PaddedPODArray<A>& a, const PaddedPODArray<B>& b,
-                                        PaddedPODArray<ResultType>& c) {
+    static void vector_vector(const PaddedPODArray<A>& a, const PaddedPODArray<B>& b,
+                              PaddedPODArray<ResultType>& c) {
         size_t size = a.size();
         for (size_t i = 0; i < size; ++i) {
             c[i] = Op::template apply<ResultType>(a[i], b[i]);
         }
     }
 
-    static void NO_INLINE vector_vector(const PaddedPODArray<A>& a, const PaddedPODArray<B>& b,
-                                        PaddedPODArray<ResultType>& c, NullMap& null_map) {
+    static void vector_vector(const PaddedPODArray<A>& a, const PaddedPODArray<B>& b,
+                              PaddedPODArray<ResultType>& c, PaddedPODArray<UInt8>& null_map) {
         size_t size = a.size();
         for (size_t i = 0; i < size; ++i) {
-            c[i] = Op::template apply<ResultType>(a[i], b[i], null_map, i);
+            c[i] = Op::template apply<ResultType>(a[i], b[i], null_map[i]);
         }
     }
 
-    static void NO_INLINE vector_constant(const PaddedPODArray<A>& a, B b,
-                                          PaddedPODArray<ResultType>& c) {
+    static void vector_constant(const PaddedPODArray<A>& a, B b, PaddedPODArray<ResultType>& c) {
         size_t size = a.size();
-        for (size_t i = 0; i < size; ++i) c[i] = Op::template apply<ResultType>(a[i], b);
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = Op::template apply<ResultType>(a[i], b);
+        }
     }
 
-    static void NO_INLINE constant_vector(A a, const PaddedPODArray<B>& b,
-                                          PaddedPODArray<ResultType>& c) {
+    static void vector_constant(const PaddedPODArray<A>& a, B b, PaddedPODArray<ResultType>& c,
+                                PaddedPODArray<UInt8>& null_map) {
+        Op::template apply<ResultType>(a, b, c, null_map);
+    }
+
+    static void constant_vector(A a, const PaddedPODArray<B>& b, PaddedPODArray<ResultType>& c) {
         size_t size = b.size();
-        for (size_t i = 0; i < size; ++i) c[i] = Op::template apply<ResultType>(a, b[i]);
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = Op::template apply<ResultType>(a, b[i]);
+        }
+    }
+
+    static void constant_vector(A a, const PaddedPODArray<B>& b, PaddedPODArray<ResultType>& c,
+                                PaddedPODArray<UInt8>& null_map) {
+        size_t size = b.size();
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = Op::template apply<ResultType>(a, b[i], null_map[i]);
+        }
     }
 
     static ResultType constant_constant(A a, B b) { return Op::template apply<ResultType>(a, b); }
+
+    static ResultType constant_constant(A a, B b, UInt8& is_null) {
+        return Op::template apply<ResultType>(a, b, is_null);
+    }
 };
 
-template <typename A, typename B, typename Op, typename ResultType = typename Op::ResultType>
-struct BinaryOperationImpl : BinaryOperationImplBase<A, B, Op, ResultType> {};
+template <typename A, typename B, typename Op, bool is_to_null_type,
+          typename ResultType = typename Op::ResultType>
+struct BinaryOperationImpl {
+    using Base = BinaryOperationImplBase<A, B, Op, ResultType>;
+
+    static ColumnPtr adapt_normal_constant_constant(A a, B b) {
+        auto column_result = Base::ColumnVectorResult::create(1);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(1, 0);
+            column_result->get_element(0) = Base::constant_constant(a, b, null_map->get_element(0));
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            column_result->get_element(0) = Base::constant_constant(a, b);
+            return column_result;
+        }
+    }
 
-template <typename, typename>
-struct PlusImpl;
-template <typename, typename>
-struct MinusImpl;
-template <typename, typename>
-struct MultiplyImpl;
-template <typename, typename>
-struct DivideFloatingImpl;
-template <typename, typename>
-struct DivideIntegralImpl;
-template <typename, typename>
-struct DivideIntegralOrZeroImpl;
-template <typename, typename>
-struct LeastBaseImpl;
-template <typename, typename>
-struct GreatestBaseImpl;
-template <typename, typename>
-struct ModuloImpl;
+    static ColumnPtr adapt_normal_vector_constant(ColumnPtr column_left, B b) {
+        auto column_left_ptr =
+                check_and_get_column<typename Base::Traits::ColumnVectorA>(column_left);
+        auto column_result = Base::ColumnVectorResult::create(column_left->size());
+        DCHECK(column_left_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_left->size(), 0);
+            Base::vector_constant(column_left_ptr->get_data(), b, column_result->get_data(),
+                                  null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            Base::vector_constant(column_left_ptr->get_data(), b, column_result->get_data());
+            return column_result;
+        }
+    }
+
+    static ColumnPtr adapt_normal_constant_vector(A a, ColumnPtr column_right) {
+        auto column_right_ptr =
+                check_and_get_column<typename Base::Traits::ColumnVectorB>(column_right);
+        auto column_result = Base::ColumnVectorResult::create(column_right->size());
+        DCHECK(column_right_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_right->size(), 0);
+            Base::constant_vector(a, column_right_ptr->get_data(), column_result->get_data(),
+                                  null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            Base::constant_vector(a, column_right_ptr->get_data(), column_result->get_data());
+            return column_result;
+        }
+    }
+
+    static ColumnPtr adapt_normal_vector_vector(ColumnPtr column_left, ColumnPtr column_right) {
+        auto column_left_ptr =
+                check_and_get_column<typename Base::Traits::ColumnVectorA>(column_left);
+        auto column_right_ptr =
+                check_and_get_column<typename Base::Traits::ColumnVectorB>(column_right);
+
+        auto column_result = Base::ColumnVectorResult::create(column_left->size());
+        DCHECK(column_left_ptr != nullptr && column_right_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_result->size(), 0);
+            Base::vector_vector(column_left_ptr->get_data(), column_right_ptr->get_data(),
+                                column_result->get_data(), null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            Base::vector_vector(column_left_ptr->get_data(), column_right_ptr->get_data(),
+                                column_result->get_data());
+            return column_result;
+        }
+    }
+};
 
 /// Binary operations for Decimals need scale args
 /// +|- scale one of args (which scale factor is not 1). ScaleR = oneof(Scale1, Scale2);
 /// *   no agrs scale. ScaleR = Scale1 + Scale2;
 /// /   first arg scale. ScaleR = Scale1 (scale_a = DecimalType<B>::get_scale()).
 template <typename A, typename B, template <typename, typename> typename Operation,
-          typename ResultType_, bool _check_overflow = true>
+          typename ResultType, bool is_to_null_type, bool check_overflow = false>
 struct DecimalBinaryOperation {
-    static constexpr bool is_plus_minus =
-            std::is_same_v<Operation<Int32, Int32>, PlusImpl<Int32, Int32>> ||
-            std::is_same_v<Operation<Int32, Int32>, MinusImpl<Int32, Int32>>;
-    static constexpr bool is_multiply =
-            std::is_same_v<Operation<Int32, Int32>, MultiplyImpl<Int32, Int32>>;
-    static constexpr bool is_float_division =
-            std::is_same_v<Operation<Int32, Int32>, DivideFloatingImpl<Int32, Int32>>;
-    static constexpr bool is_int_division =
-            std::is_same_v<Operation<Int32, Int32>, DivideIntegralImpl<Int32, Int32>> ||
-            std::is_same_v<Operation<Int32, Int32>, DivideIntegralOrZeroImpl<Int32, Int32>>;
-    static constexpr bool is_division = is_float_division || is_int_division;
-    static constexpr bool is_compare =
-            std::is_same_v<Operation<Int32, Int32>, LeastBaseImpl<Int32, Int32>> ||
-            std::is_same_v<Operation<Int32, Int32>, GreatestBaseImpl<Int32, Int32>>;
-    static constexpr bool is_plus_minus_compare = is_plus_minus || is_compare;
-    static constexpr bool can_overflow = is_plus_minus || is_multiply;
+    using OpTraits = OperationTraits<Operation>;
 
-    using ResultType = ResultType_;
     using NativeResultType = typename NativeType<ResultType>::Type;
     using Op = Operation<NativeResultType, NativeResultType>;
 
-    using ColVecA = std::conditional_t<IsDecimalNumber<A>, ColumnDecimal<A>, ColumnVector<A>>;
-    using ColVecB = std::conditional_t<IsDecimalNumber<B>, ColumnDecimal<B>, ColumnVector<B>>;
-    using ArrayA = typename ColVecA::Container;
-    using ArrayB = typename ColVecB::Container;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
     using ArrayC = typename ColumnDecimal<ResultType>::Container;
-    using SelfNoOverflow = DecimalBinaryOperation<A, B, Operation, ResultType_, false>;
 
-    static void vector_vector(const ArrayA& a, const ArrayB& b, ArrayC& c, ResultType scale_a,
-                              ResultType scale_b, bool check_overflow) {
-        if (check_overflow)
-            vector_vector(a, b, c, scale_a, scale_b);
-        else
-            SelfNoOverflow::vector_vector(a, b, c, scale_a, scale_b);
-    }
-
-    /// null_map for divide and mod
-    static void vector_vector(const ArrayA& a, const ArrayB& b, ArrayC& c, ResultType scale_a,
-                              ResultType scale_b, bool check_overflow, NullMap& null_map) {
-        if (check_overflow)
-            vector_vector(a, b, c, scale_a, scale_b, null_map);
-        else
-            SelfNoOverflow::vector_vector(a, b, c, scale_a, scale_b, null_map);
-    }
-
-    static void vector_constant(const ArrayA& a, B b, ArrayC& c, ResultType scale_a,
-                                ResultType scale_b, bool check_overflow) {
-        if (check_overflow)
-            vector_constant(a, b, c, scale_a, scale_b);
-        else
-            SelfNoOverflow::vector_constant(a, b, c, scale_a, scale_b);
-    }
-
-    static void constant_vector(A a, const ArrayB& b, ArrayC& c, ResultType scale_a,
-                                ResultType scale_b, bool check_overflow) {
-        if (check_overflow)
-            constant_vector(a, b, c, scale_a, scale_b);
-        else
-            SelfNoOverflow::constant_vector(a, b, c, scale_a, scale_b);
-    }
-
-    static ResultType constant_constant(A a, B b, ResultType scale_a, ResultType scale_b,
-                                        bool check_overflow) {
-        if (check_overflow)
-            return constant_constant(a, b, scale_a, scale_b);
-        else
-            return SelfNoOverflow::constant_constant(a, b, scale_a, scale_b);
-    }
-
-    static void NO_INLINE vector_vector(const ArrayA& a, const ArrayB& b, ArrayC& c,
-                                        ResultType scale_a [[maybe_unused]],
-                                        ResultType scale_b [[maybe_unused]]) {
+    static void vector_vector(const typename Traits::ArrayA& a, const typename Traits::ArrayB& b,
+                              ArrayC& c, ResultType scale_a [[maybe_unused]],
+                              ResultType scale_b [[maybe_unused]]) {
         size_t size = a.size();
-        if constexpr (is_plus_minus_compare) {
+        if constexpr (OpTraits::is_plus_minus) {
             if (scale_a != 1) {
                 for (size_t i = 0; i < size; ++i) {
                     c[i] = apply_scaled<true>(a[i], b[i], scale_a);
@@ -201,71 +244,207 @@ struct DecimalBinaryOperation {
     }
 
     /// null_map for divide and mod
-    static void NO_INLINE vector_vector(const ArrayA& a, const ArrayB& b, ArrayC& c,
-                                        ResultType scale_a [[maybe_unused]],
-                                        ResultType scale_b [[maybe_unused]], NullMap& null_map) {
+    static void vector_vector(const typename Traits::ArrayA& a, const typename Traits::ArrayB& b,
+                              ArrayC& c, ResultType scale_a [[maybe_unused]],
+                              ResultType scale_b [[maybe_unused]], NullMap& null_map) {
         size_t size = a.size();
 
         /// default: use it if no return before
         for (size_t i = 0; i < size; ++i) {
-            c[i] = apply(a[i], b[i], null_map, i);
+            c[i] = apply(a[i], b[i], null_map[i]);
         }
     }
 
-    static void NO_INLINE vector_constant(const ArrayA& a, B b, ArrayC& c,
-                                          ResultType scale_a [[maybe_unused]],
-                                          ResultType scale_b [[maybe_unused]]) {
+    static void vector_constant(const typename Traits::ArrayA& a, B b, ArrayC& c,
+                                ResultType scale_a [[maybe_unused]],
+                                ResultType scale_b [[maybe_unused]]) {
         size_t size = a.size();
-        if constexpr (is_plus_minus_compare) {
+        if constexpr (OpTraits::is_plus_minus) {
             if (scale_a != 1) {
-                for (size_t i = 0; i < size; ++i) c[i] = apply_scaled<true>(a[i], b, scale_a);
+                for (size_t i = 0; i < size; ++i) {
+                    c[i] = apply_scaled<true>(a[i], b, scale_a);
+                }
                 return;
             } else if (scale_b != 1) {
-                for (size_t i = 0; i < size; ++i) c[i] = apply_scaled<false>(a[i], b, scale_b);
+                for (size_t i = 0; i < size; ++i) {
+                    c[i] = apply_scaled<false>(a[i], b, scale_b);
+                }
                 return;
             }
-        } else if constexpr (is_division && IsDecimalNumber<B>) {
-            for (size_t i = 0; i < size; ++i) c[i] = apply_scaled_div(a[i], b, scale_a);
+        } else if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
+            for (size_t i = 0; i < size; ++i) {
+                c[i] = apply_scaled_div(a[i], b, scale_a);
+            }
             return;
         }
 
         /// default: use it if no return before
-        for (size_t i = 0; i < size; ++i) c[i] = apply(a[i], b);
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = apply(a[i], b);
+        }
+    }
+
+    static void vector_constant(const typename Traits::ArrayA& a, B b, ArrayC& c,
+                                ResultType scale_a [[maybe_unused]],
+                                ResultType scale_b [[maybe_unused]], NullMap& null_map) {
+        size_t size = a.size();
+        if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
+            for (size_t i = 0; i < size; ++i) {
+                c[i] = apply_scaled_div(a[i], b, scale_a, null_map[i]);
+            }
+            return;
+        }
+
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = apply(a[i], b, null_map[i]);
+        }
     }
 
-    static void NO_INLINE constant_vector(A a, const ArrayB& b, ArrayC& c,
-                                          ResultType scale_a [[maybe_unused]],
-                                          ResultType scale_b [[maybe_unused]]) {
+    static void constant_vector(A a, const typename Traits::ArrayB& b, ArrayC& c,
+                                ResultType scale_a [[maybe_unused]],
+                                ResultType scale_b [[maybe_unused]]) {
         size_t size = b.size();
-        if constexpr (is_plus_minus_compare) {
+        if constexpr (OpTraits::is_plus_minus) {
             if (scale_a != 1) {
-                for (size_t i = 0; i < size; ++i) c[i] = apply_scaled<true>(a, b[i], scale_a);
+                for (size_t i = 0; i < size; ++i) {
+                    c[i] = apply_scaled<true>(a, b[i], scale_a);
+                }
                 return;
             } else if (scale_b != 1) {
-                for (size_t i = 0; i < size; ++i) c[i] = apply_scaled<false>(a, b[i], scale_b);
+                for (size_t i = 0; i < size; ++i) {
+                    c[i] = apply_scaled<false>(a, b[i], scale_b);
+                }
                 return;
             }
-        } else if constexpr (is_division && IsDecimalNumber<B>) {
-            for (size_t i = 0; i < size; ++i) c[i] = apply_scaled_div(a, b[i], scale_a);
+        } else if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
+            for (size_t i = 0; i < size; ++i) {
+                c[i] = apply_scaled_div(a, b[i], scale_a);
+            }
             return;
         }
 
         /// default: use it if no return before
-        for (size_t i = 0; i < size; ++i) c[i] = apply(a, b[i]);
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = apply(a, b[i]);
+        }
+    }
+
+    static void constant_vector(A a, const typename Traits::ArrayB& b, ArrayC& c,
+                                ResultType scale_a [[maybe_unused]],
+                                ResultType scale_b [[maybe_unused]], NullMap& null_map) {
+        size_t size = b.size();
+        if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
+            for (size_t i = 0; i < size; ++i) {
+                c[i] = apply_scaled_div(a, b[i], scale_a, null_map[i]);
+            }
+            return;
+        }
+
+        for (size_t i = 0; i < size; ++i) {
+            c[i] = apply(a, b[i], null_map[i]);
+        }
     }
 
     static ResultType constant_constant(A a, B b, ResultType scale_a [[maybe_unused]],
                                         ResultType scale_b [[maybe_unused]]) {
-        if constexpr (is_plus_minus_compare) {
-            if (scale_a != 1)
+        if constexpr (OpTraits::is_plus_minus) {
+            if (scale_a != 1) {
                 return apply_scaled<true>(a, b, scale_a);
-            else if (scale_b != 1)
+            } else if (scale_b != 1) {
                 return apply_scaled<false>(a, b, scale_b);
-        } else if constexpr (is_division && IsDecimalNumber<B>)
+            }
+        } else if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
             return apply_scaled_div(a, b, scale_a);
+        }
         return apply(a, b);
     }
 
+    static ResultType constant_constant(A a, B b, ResultType scale_a [[maybe_unused]],
+                                        ResultType scale_b [[maybe_unused]], UInt8& is_null) {
+        if constexpr (OpTraits::is_plus_minus) {
+            if (scale_a != 1) {
+                return apply_scaled<true>(a, b, scale_a, is_null);
+            } else if (scale_b != 1) {
+                return apply_scaled<false>(a, b, scale_b, is_null);
+            }
+        } else if constexpr (OpTraits::is_division && IsDecimalNumber<B>) {
+            return apply_scaled_div(a, b, scale_a, is_null);
+        }
+        return apply(a, b, is_null);
+    }
+
+    static ColumnPtr adapt_decimal_constant_constant(A a, B b, UInt32 scale, ResultType scale_a,
+                                                     ResultType scale_b) {
+        auto column_result = ColumnDecimal<ResultType>::create(1, scale);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(1, 0);
+            column_result->get_element(0) =
+                    constant_constant(a, b, scale_a, scale_b, null_map->get_element(0));
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            column_result->get_element(0) = constant_constant(a, b, scale_a, scale_b);
+            return column_result;
+        }
+    }
+
+    static ColumnPtr adapt_decimal_vector_constant(ColumnPtr column_left, B b, UInt32 column_scale,
+                                                   ResultType scale_a, ResultType scale_b) {
+        auto column_left_ptr = check_and_get_column<typename Traits::ColumnVectorA>(column_left);
+        auto column_result = ColumnDecimal<ResultType>::create(column_left->size(), column_scale);
+        DCHECK(column_left_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_left->size(), 0);
+            vector_constant(column_left_ptr->get_data(), b, column_result->get_data(), scale_a,
+                            scale_b, null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            vector_constant(column_left_ptr->get_data(), b, column_result->get_data(), scale_a,
+                            scale_b);
+            return column_result;
+        }
+    }
+
+    static ColumnPtr adapt_decimal_constant_vector(A a, ColumnPtr column_right, UInt32 column_scale,
+                                                   ResultType scale_a, ResultType scale_b) {
+        auto column_right_ptr = check_and_get_column<typename Traits::ColumnVectorB>(column_right);
+        auto column_result = ColumnDecimal<ResultType>::create(column_right->size(), column_scale);
+        DCHECK(column_right_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_right->size(), 0);
+            constant_vector(a, column_right_ptr->get_data(), column_result->get_data(), scale_a,
+                            scale_b, null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            constant_vector(a, column_right_ptr->get_data(), column_result->get_data(), scale_a,
+                            scale_b);
+            return column_result;
+        }
+    }
+
+    static ColumnPtr adapt_decimal_vector_vector(ColumnPtr column_left, ColumnPtr column_right,
+                                                 UInt32 column_scale, ResultType scale_a,
+                                                 ResultType scale_b) {
+        auto column_left_ptr = check_and_get_column<typename Traits::ColumnVectorA>(column_left);
+        auto column_right_ptr = check_and_get_column<typename Traits::ColumnVectorB>(column_right);
+
+        auto column_result = ColumnDecimal<ResultType>::create(column_left->size(), column_scale);
+        DCHECK(column_left_ptr != nullptr && column_right_ptr != nullptr);
+
+        if constexpr (is_to_null_type) {
+            auto null_map = ColumnUInt8::create(column_result->size(), 0);
+            vector_vector(column_left_ptr->get_data(), column_right_ptr->get_data(),
+                          column_result->get_data(), scale_a, scale_b, null_map->get_data());
+            return ColumnNullable::create(std::move(column_result), std::move(null_map));
+        } else {
+            vector_vector(column_left_ptr->get_data(), column_right_ptr->get_data(),
+                          column_result->get_data(), scale_a, scale_b);
+            return column_result;
+        }
+    }
+
 private:
     /// there's implicit type convertion here
     static NativeResultType apply(NativeResultType a, NativeResultType b) {
@@ -280,11 +459,10 @@ private:
     }
 
     /// null_map for divide and mod
-    static NativeResultType apply(NativeResultType a, NativeResultType b, NullMap& null_map,
-                                  size_t index) {
+    static NativeResultType apply(NativeResultType a, NativeResultType b, UInt8& is_null) {
         DecimalV2Value l(a);
         DecimalV2Value r(b);
-        auto ans = Op::template apply(l, r, null_map, index);
+        auto ans = Op::template apply(l, r, is_null);
         NativeResultType result;
         memcpy(&result, &ans, std::min(sizeof(result), sizeof(ans)));
         return result;
@@ -293,30 +471,33 @@ private:
     template <bool scale_left>
     static NativeResultType apply_scaled(NativeResultType a, NativeResultType b,
                                          NativeResultType scale) {
-        if constexpr (is_plus_minus_compare) {
+        if constexpr (OpTraits::is_plus_minus) {
             NativeResultType res;
 
-            if constexpr (_check_overflow) {
+            if constexpr (check_overflow) {
                 bool overflow = false;
-                if constexpr (scale_left)
+                if constexpr (scale_left) {
                     overflow |= common::mul_overflow(a, scale, a);
-                else
+                } else {
                     overflow |= common::mul_overflow(b, scale, b);
+                }
 
-                if constexpr (can_overflow)
+                if constexpr (OpTraits::can_overflow) {
                     overflow |= Op::template apply<NativeResultType>(a, b, res);
-                else
+                } else {
                     res = Op::template apply<NativeResultType>(a, b);
+                }
 
                 if (overflow) {
                     LOG(FATAL) << "Decimal math overflow";
                 }
             } else {
-                if constexpr (scale_left)
+                if constexpr (scale_left) {
                     a *= scale;
-                else
+                } else {
                     b *= scale;
-                res = Op::template apply<NativeResultType>(a, b);
+                }
+                res = apply(a, b);
             }
 
             return res;
@@ -324,23 +505,25 @@ private:
     }
 
     static NativeResultType apply_scaled_div(NativeResultType a, NativeResultType b,
-                                             NativeResultType scale, NullMap& null_map,
-                                             size_t index) {
-        if constexpr (is_division) {
-            if constexpr (_check_overflow) {
+                                             NativeResultType scale, UInt8& is_null) {
+        if constexpr (OpTraits::is_division) {
+            if constexpr (check_overflow) {
                 bool overflow = false;
-                if constexpr (!IsDecimalNumber<A>)
+                if constexpr (!IsDecimalNumber<A>) {
                     overflow |= common::mul_overflow(scale, scale, scale);
+                }
                 overflow |= common::mul_overflow(a, scale, a);
                 if (overflow) {
                     LOG(FATAL) << "Decimal math overflow";
                 }
             } else {
-                if constexpr (!IsDecimalNumber<A>) scale *= scale;
+                if constexpr (!IsDecimalNumber<A>) {
+                    scale *= scale;
+                }
                 a *= scale;
             }
 
-            return Op::template apply<NativeResultType>(a, b, null_map, index);
+            return apply(a, b, is_null);
         }
     }
 };
@@ -362,12 +545,6 @@ constexpr bool IsIntegral = false;
 template <>
 inline constexpr bool IsIntegral<DataTypeUInt8> = true;
 template <>
-inline constexpr bool IsIntegral<DataTypeUInt16> = true;
-template <>
-inline constexpr bool IsIntegral<DataTypeUInt32> = true;
-template <>
-inline constexpr bool IsIntegral<DataTypeUInt64> = true;
-template <>
 inline constexpr bool IsIntegral<DataTypeInt8> = true;
 template <>
 inline constexpr bool IsIntegral<DataTypeInt16> = true;
@@ -378,14 +555,7 @@ inline constexpr bool IsIntegral<DataTypeInt64> = true;
 template <>
 inline constexpr bool IsIntegral<DataTypeInt128> = true;
 
-template <typename DataType>
-constexpr bool IsFloatingPoint = false;
-template <>
-inline constexpr bool IsFloatingPoint<DataTypeFloat32> = true;
-template <>
-inline constexpr bool IsFloatingPoint<DataTypeFloat64> = true;
-
-template <typename T0, typename T1>
+template <typename A, typename B>
 constexpr bool UseLeftDecimal = false;
 template <>
 inline constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal128>, DataTypeDecimal<Decimal32>> =
@@ -403,29 +573,13 @@ using DataTypeFromFieldType =
 template <template <typename, typename> class Operation, typename LeftDataType,
           typename RightDataType>
 struct BinaryOperationTraits {
-    using T0 = typename LeftDataType::FieldType;
-    using T1 = typename RightDataType::FieldType;
-
-private: /// it's not correct for Decimal
-    using Op = Operation<T0, T1>;
-
-public:
-    static constexpr bool allow_decimal =
-            std::is_same_v<Operation<T0, T0>, PlusImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, MinusImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, MultiplyImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, ModuloImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, DivideFloatingImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, DivideIntegralImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, DivideIntegralOrZeroImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, LeastBaseImpl<T0, T0>> ||
-            std::is_same_v<Operation<T0, T0>, GreatestBaseImpl<T0, T0>>;
-
+    using Op = Operation<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
+    using OpTraits = OperationTraits<Operation>;
     /// Appropriate result type for binary operator on numeric types. "Date" can also mean
     /// DateTime, but if both operands are Dates, their type must be the same (e.g. Date - DateTime is invalid).
     using ResultDataType = Switch<
             /// Decimal cases
-            Case<!allow_decimal &&
+            Case<!OpTraits::allow_decimal &&
                          (IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>),
                  InvalidType>,
             Case<IsDataTypeDecimal<LeftDataType> && IsDataTypeDecimal<RightDataType> &&
@@ -451,20 +605,135 @@ public:
                  DataTypeFromFieldType<typename Op::ResultType>>>;
 };
 
-template <template <typename, typename> class Op, typename Name,
-          bool CanBeExecutedOnDefaultArguments = true>
+template <typename LeftDataType, typename RightDataType,
+          template <typename, typename> class Operation, bool is_to_null_type>
+struct ConstOrVectorAdapter {
+    static constexpr bool result_is_decimal =
+            IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
+    using OpTraits = OperationTraits<Operation>;
+
+    using ResultDataType =
+            typename BinaryOperationTraits<Operation, LeftDataType, RightDataType>::ResultDataType;
+    using ResultType = typename ResultDataType::FieldType;
+    using A = typename LeftDataType::FieldType;
+    using B = typename RightDataType::FieldType;
+
+    using OperationImpl = std::conditional_t<
+            IsDataTypeDecimal<ResultDataType>,
+            DecimalBinaryOperation<A, B, Operation, ResultType, is_to_null_type>,
+            BinaryOperationImpl<A, B, Operation<A, B>, is_to_null_type, ResultType>>;
+
+    static ColumnPtr execute(ColumnPtr column_left, ColumnPtr column_right,
+                             const LeftDataType& type_left, const RightDataType& type_right) {
+        bool is_const_left = is_column_const(*column_left);
+        bool is_const_right = is_column_const(*column_right);
+
+        if (is_const_left && is_const_right) {
+            return constant_constant(column_left, column_right, type_left, type_right);
+        } else if (is_const_left) {
+            return constant_vector(column_left, column_right, type_left, type_right);
+        } else if (is_const_right) {
+            return vector_constant(column_left, column_right, type_left, type_right);
+        } else {
+            return vector_vector(column_left, column_right, type_left, type_right);
+        }
+    }
+
+private:
+    static auto get_decimal_infos(const LeftDataType& type_left, const RightDataType& type_right) {
+        ResultDataType type = decimal_result_type(type_left, type_right, OpTraits::is_multiply,
+                                                  OpTraits::is_division);
+        typename ResultDataType::FieldType scale_a =
+                type.scale_factor_for(type_left, OpTraits::is_multiply);
+        typename ResultDataType::FieldType scale_b =
+                type.scale_factor_for(type_right, OpTraits::is_multiply || OpTraits::is_division);
+        return std::make_tuple(type, scale_a, scale_b);
+    }
+
+    static ColumnPtr constant_constant(ColumnPtr column_left, ColumnPtr column_right,
+                                       const LeftDataType& type_left,
+                                       const RightDataType& type_right) {
+        auto column_left_ptr = check_and_get_column<ColumnConst>(column_left);
+        auto column_right_ptr = check_and_get_column<ColumnConst>(column_right);
+        DCHECK(column_left_ptr != nullptr && column_right_ptr != nullptr);
+
+        ColumnPtr column_result = nullptr;
+
+        if constexpr (result_is_decimal) {
+            auto [type, scale_a, scale_b] = get_decimal_infos(type_left, type_right);
+
+            column_result = OperationImpl::adapt_decimal_constant_constant(
+                    column_left_ptr->template get_value<A>(),
+                    column_right_ptr->template get_value<B>(), type.get_scale(), scale_a, scale_b);
+
+        } else {
+            column_result = OperationImpl::adapt_normal_constant_constant(
+                    column_left_ptr->template get_value<A>(),
+                    column_right_ptr->template get_value<B>());
+        }
+
+        return ColumnConst::create(std::move(column_result), column_left->size());
+    }
+
+    static ColumnPtr vector_constant(ColumnPtr column_left, ColumnPtr column_right,
+                                     const LeftDataType& type_left,
+                                     const RightDataType& type_right) {
+        auto column_right_ptr = check_and_get_column<ColumnConst>(column_right);
+        DCHECK(column_right_ptr != nullptr);
+
+        if constexpr (result_is_decimal) {
+            auto [type, scale_a, scale_b] = get_decimal_infos(type_left, type_right);
+
+            return OperationImpl::adapt_decimal_vector_constant(
+                    column_left->get_ptr(), column_right_ptr->template get_value<B>(),
+                    type.get_scale(), scale_a, scale_b);
+        } else {
+            return OperationImpl::adapt_normal_vector_constant(
+                    column_left->get_ptr(), column_right_ptr->template get_value<B>());
+        }
+    }
+
+    static ColumnPtr constant_vector(ColumnPtr column_left, ColumnPtr column_right,
+                                     const LeftDataType& type_left,
+                                     const RightDataType& type_right) {
+        auto column_left_ptr = check_and_get_column<ColumnConst>(column_left);
+        DCHECK(column_left_ptr != nullptr);
+
+        if constexpr (result_is_decimal) {
+            auto [type, scale_a, scale_b] = get_decimal_infos(type_left, type_right);
+
+            return OperationImpl::adapt_decimal_constant_vector(
+                    column_left_ptr->template get_value<A>(), column_right->get_ptr(),
+                    type.get_scale(), scale_a, scale_b);
+        } else {
+            return OperationImpl::adapt_normal_constant_vector(
+                    column_left_ptr->template get_value<A>(), column_right->get_ptr());
+        }
+    }
+
+    static ColumnPtr vector_vector(ColumnPtr column_left, ColumnPtr column_right,
+                                   const LeftDataType& type_left, const RightDataType& type_right) {
+        if constexpr (result_is_decimal) {
+            auto [type, scale_a, scale_b] = get_decimal_infos(type_left, type_right);
+
+            return OperationImpl::adapt_decimal_vector_vector(column_left->get_ptr(),
+                                                              column_right->get_ptr(),
+                                                              type.get_scale(), scale_a, scale_b);
+        } else {
+            return OperationImpl::adapt_normal_vector_vector(column_left->get_ptr(),
+                                                             column_right->get_ptr());
+        }
+    }
+};
+
+template <template <typename, typename> class Operation, typename Name, bool is_to_null_type>
 class FunctionBinaryArithmetic : public IFunction {
-    bool check_decimal_overflow = true;
-    static constexpr bool has_variadic_argument =
-            !std::is_void_v<decltype(has_variadic_argument_types(std::declval<Op<int, int>>()))>;
+    using OpTraits = OperationTraits<Operation>;
 
     template <typename F>
     static bool cast_type(const IDataType* type, F&& f) {
-        return cast_type_to_either<DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64,
-                                   DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64,
-                                   DataTypeInt128, DataTypeFloat32, DataTypeFloat64,
-                                   //            DataTypeDate,
-                                   //            DataTypeDateTime,
+        return cast_type_to_either<DataTypeUInt8, DataTypeInt8, DataTypeInt16, DataTypeInt32,
+                                   DataTypeInt64, DataTypeInt128, DataTypeFloat32, DataTypeFloat64,
                                    DataTypeDecimal<Decimal32>, DataTypeDecimal<Decimal64>,
                                    DataTypeDecimal<Decimal128>>(type, std::forward<F>(f));
     }
@@ -476,39 +745,17 @@ class FunctionBinaryArithmetic : public IFunction {
         });
     }
 
-    bool is_aggregate_multiply(const DataTypePtr& type0, const DataTypePtr& type1) const {
-        if constexpr (!std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>) return false;
-
-        WhichDataType which0(type0);
-        WhichDataType which1(type1);
-
-        return (which0.is_aggregate_function() && which1.is_native_uint()) ||
-               (which0.is_native_uint() && which1.is_aggregate_function());
-    }
-
-    bool is_aggregate_addition(const DataTypePtr& type0, const DataTypePtr& type1) const {
-        if constexpr (!std::is_same_v<Op<UInt8, UInt8>, PlusImpl<UInt8, UInt8>>) return false;
-
-        WhichDataType which0(type0);
-        WhichDataType which1(type1);
-
-        return which0.is_aggregate_function() && which1.is_aggregate_function();
-    }
-
 public:
     static constexpr auto name = Name::name;
+
     static FunctionPtr create() { return std::make_shared<FunctionBinaryArithmetic>(); }
 
-    FunctionBinaryArithmetic() {}
+    FunctionBinaryArithmetic() = default;
+
     String get_name() const override { return name; }
 
     size_t get_number_of_arguments() const override { return 2; }
 
-    DataTypes get_variadic_argument_types_impl() const override {
-        if constexpr (has_variadic_argument) return Op<int, int>::get_variadic_argument_types();
-        return {};
-    }
-
     DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
         DataTypePtr type_res;
         bool valid = cast_both_types(
@@ -516,29 +763,26 @@ public:
                     using LeftDataType = std::decay_t<decltype(left)>;
                     using RightDataType = std::decay_t<decltype(right)>;
                     using ResultDataType =
-                            typename BinaryOperationTraits<Op, LeftDataType,
+                            typename BinaryOperationTraits<Operation, LeftDataType,
                                                            RightDataType>::ResultDataType;
                     if constexpr (!std::is_same_v<ResultDataType, InvalidType>) {
                         if constexpr (IsDataTypeDecimal<LeftDataType> &&
                                       IsDataTypeDecimal<RightDataType>) {
-                            constexpr bool is_multiply =
-                                    std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
-                            constexpr bool is_division = false;
-
-                            ResultDataType result_type =
-                                    decimal_result_type(left, right, is_multiply, is_division);
+                            ResultDataType result_type = decimal_result_type(
+                                    left, right, OpTraits::is_multiply, OpTraits::is_division);
                             type_res = std::make_shared<ResultDataType>(result_type.get_precision(),
                                                                         result_type.get_scale());
-                        } else if constexpr (IsDataTypeDecimal<LeftDataType>)
+                        } else if constexpr (IsDataTypeDecimal<LeftDataType>) {
                             type_res = std::make_shared<LeftDataType>(left.get_precision(),
                                                                       left.get_scale());
-                        else if constexpr (IsDataTypeDecimal<RightDataType>)
+                        } else if constexpr (IsDataTypeDecimal<RightDataType>) {
                             type_res = std::make_shared<RightDataType>(right.get_precision(),
                                                                        right.get_scale());
-                        else if constexpr (IsDataTypeDecimal<ResultDataType>)
+                        } else if constexpr (IsDataTypeDecimal<ResultDataType>) {
                             type_res = std::make_shared<ResultDataType>(27, 9);
-                        else
+                        } else {
                             type_res = std::make_shared<ResultDataType>();
+                        }
                         return true;
                     }
                     return false;
@@ -549,6 +793,10 @@ public:
                                       get_name());
         }
 
+        if constexpr (is_to_null_type) {
+            return make_nullable(type_res);
+        }
+
         return type_res;
     }
 
@@ -556,150 +804,29 @@ public:
                         size_t result, size_t input_rows_count) override {
         auto* left_generic = block.get_by_position(arguments[0]).type.get();
         auto* right_generic = block.get_by_position(arguments[1]).type.get();
+        if (left_generic->is_nullable()) {
+            left_generic =
+                    static_cast<const DataTypeNullable*>(left_generic)->get_nested_type().get();
+        }
+        if (right_generic->is_nullable()) {
+            right_generic =
+                    static_cast<const DataTypeNullable*>(right_generic)->get_nested_type().get();
+        }
+
         bool valid = cast_both_types(
                 left_generic, right_generic, [&](const auto& left, const auto& right) {
                     using LeftDataType = std::decay_t<decltype(left)>;
                     using RightDataType = std::decay_t<decltype(right)>;
                     using ResultDataType =
-                            typename BinaryOperationTraits<Op, LeftDataType,
+                            typename BinaryOperationTraits<Operation, LeftDataType,
                                                            RightDataType>::ResultDataType;
-                    if constexpr (!std::is_same_v<ResultDataType, InvalidType>) {
-                        constexpr bool result_is_decimal =
-                                IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
-                        constexpr bool is_multiply =
-                                std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
-                        constexpr bool is_division = false;
-
-                        using T0 = typename LeftDataType::FieldType;
-                        using T1 = typename RightDataType::FieldType;
-                        using ResultType = typename ResultDataType::FieldType;
-                        using ColVecT0 = std::conditional_t<IsDecimalNumber<T0>, ColumnDecimal<T0>,
-                                                            ColumnVector<T0>>;
-                        using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>,
-                                                            ColumnVector<T1>>;
-                        using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>,
-                                                                ColumnDecimal<ResultType>,
-                                                                ColumnVector<ResultType>>;
-
-                        /// Decimal operations need scale. Operations are on result type.
-                        using OpImpl = std::conditional_t<
-                                IsDataTypeDecimal<ResultDataType>,
-                                DecimalBinaryOperation<T0, T1, Op, ResultType>,
-                                BinaryOperationImpl<T0, T1, Op<T0, T1>, ResultType>>;
-
-                        auto col_left_raw = block.get_by_position(arguments[0]).column.get();
-                        auto col_right_raw = block.get_by_position(arguments[1]).column.get();
-                        if (auto col_left = check_and_get_column_const<ColVecT0>(col_left_raw)) {
-                            if (auto col_right =
-                                        check_and_get_column_const<ColVecT1>(col_right_raw)) {
-                                /// the only case with a non-vector result
-                                if constexpr (result_is_decimal) {
-                                    ResultDataType type = decimal_result_type(
-                                            left, right, is_multiply, is_division);
-                                    typename ResultDataType::FieldType scale_a =
-                                            type.scale_factor_for(left, is_multiply);
-                                    typename ResultDataType::FieldType scale_b =
-                                            type.scale_factor_for(right,
-                                                                  is_multiply || is_division);
-
-                                    auto res = OpImpl::constant_constant(
-                                            col_left->template get_value<T0>(),
-                                            col_right->template get_value<T1>(), scale_a, scale_b,
-                                            check_decimal_overflow);
-                                    block.get_by_position(result).column =
-                                            ResultDataType(type.get_precision(), type.get_scale())
-                                                    .create_column_const(
-                                                            col_left->size(),
-                                                            to_field(res, type.get_scale()));
-
-                                } else {
-                                    auto res = OpImpl::constant_constant(
-                                            col_left->template get_value<T0>(),
-                                            col_right->template get_value<T1>());
-                                    block.get_by_position(result).column =
-                                            ResultDataType().create_column_const(col_left->size(),
-                                                                                 to_field(res));
-                                }
-                                return true;
-                            }
-                        }
 
-                        typename ColVecResult::MutablePtr col_res = nullptr;
-                        if constexpr (result_is_decimal) {
-                            ResultDataType type =
-                                    decimal_result_type(left, right, is_multiply, is_division);
-                            col_res = ColVecResult::create(0, type.get_scale());
-                        } else
-                            col_res = ColVecResult::create();
-
-                        auto& vec_res = col_res->get_data();
-                        vec_res.resize(block.rows());
-
-                        if (auto col_left_const =
-                                    check_and_get_column_const<ColVecT0>(col_left_raw)) {
-                            if (auto col_right = check_and_get_column<ColVecT1>(col_right_raw)) {
-                                if constexpr (result_is_decimal) {
-                                    ResultDataType type = decimal_result_type(
-                                            left, right, is_multiply, is_division);
-
-                                    typename ResultDataType::FieldType scale_a =
-                                            type.scale_factor_for(left, is_multiply);
-                                    typename ResultDataType::FieldType scale_b =
-                                            type.scale_factor_for(right,
-                                                                  is_multiply || is_division);
-                                    if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
-                                        scale_a = right.get_scale_multiplier();
-
-                                    OpImpl::constant_vector(
-                                            col_left_const->template get_value<T0>(),
-                                            col_right->get_data(), vec_res, scale_a, scale_b,
-                                            check_decimal_overflow);
-                                } else
-                                    OpImpl::constant_vector(
-                                            col_left_const->template get_value<T0>(),
-                                            col_right->get_data(), vec_res);
-                            } else
-                                return false;
-                        } else if (auto col_left = check_and_get_column<ColVecT0>(col_left_raw)) {
-                            if constexpr (result_is_decimal) {
-                                ResultDataType type =
-                                        decimal_result_type(left, right, is_multiply, is_division);
-
-                                typename ResultDataType::FieldType scale_a =
-                                        type.scale_factor_for(left, is_multiply);
-                                typename ResultDataType::FieldType scale_b =
-                                        type.scale_factor_for(right, is_multiply || is_division);
-                                if (auto col_right =
-                                            check_and_get_column<ColVecT1>(col_right_raw)) {
-                                    OpImpl::vector_vector(col_left->get_data(),
-                                                          col_right->get_data(), vec_res, scale_a,
-                                                          scale_b, check_decimal_overflow);
-                                } else if (auto col_right_const =
-                                                   check_and_get_column_const<ColVecT1>(
-                                                           col_right_raw)) {
-                                    OpImpl::vector_constant(
-                                            col_left->get_data(),
-                                            col_right_const->template get_value<T1>(), vec_res,
-                                            scale_a, scale_b, check_decimal_overflow);
-                                } else
-                                    return false;
-                            } else {
-                                if (auto col_right = check_and_get_column<ColVecT1>(col_right_raw))
-                                    OpImpl::vector_vector(col_left->get_data(),
-                                                          col_right->get_data(), vec_res);
-                                else if (auto col_right_const =
-                                                 check_and_get_column_const<ColVecT1>(
-                                                         col_right_raw))
-                                    OpImpl::vector_constant(
-                                            col_left->get_data(),
-                                            col_right_const->template get_value<T1>(), vec_res);
-                                else
-                                    return false;
-                            }
-                        } else
-                            return false;
-
-                        block.replace_by_position(result, std::move(col_res));
+                    if constexpr (!std::is_same_v<ResultDataType, InvalidType>) {
+                        auto column_result = ConstOrVectorAdapter<LeftDataType, RightDataType,
+                                                                  Operation, is_to_null_type>::
+                                execute(block.get_by_position(arguments[0]).column,
+                                        block.get_by_position(arguments[1]).column, left, right);
+                        block.replace_by_position(result, std::move(column_result));
                         return true;
                     }
                     return false;
@@ -711,10 +838,6 @@ public:
 
         return Status::OK();
     }
-
-    bool can_be_executed_on_default_arguments() const override {
-        return CanBeExecutedOnDefaultArguments;
-    }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_binary_arithmetic_to_null_type.h b/be/src/vec/functions/function_binary_arithmetic_to_null_type.h
deleted file mode 100644
index 66316116ae..0000000000
--- a/be/src/vec/functions/function_binary_arithmetic_to_null_type.h
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "vec/functions/function_binary_arithmetic.h"
-
-namespace doris::vectorized {
-
-/**
- * Arithmetic operations: /, %
- * intDiv (integer division)
- */
-
-template <template <typename, typename> class Op, typename Name,
-          bool CanBeExecutedOnDefaultArguments = true>
-class FunctionBinaryArithmeticToNullType : public IFunction {
-    bool check_decimal_overflow = true;
-
-    template <typename F>
-    static bool cast_type(const IDataType* type, F&& f) {
-        return cast_type_to_either<DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64,
-                                   DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64,
-                                   DataTypeInt128, DataTypeFloat32, DataTypeFloat64,
-                                   DataTypeDecimal<Decimal32>, DataTypeDecimal<Decimal64>,
-                                   DataTypeDecimal<Decimal128>>(type, std::forward<F>(f));
-    }
-
-    template <typename F>
-    static bool cast_both_types(const IDataType* left, const IDataType* right, F&& f) {
-        return cast_type(left, [&](const auto& left_) {
-            return cast_type(right, [&](const auto& right_) { return f(left_, right_); });
-        });
-    }
-
-public:
-    static constexpr auto name = Name::name;
-    static FunctionPtr create() { return std::make_shared<FunctionBinaryArithmeticToNullType>(); }
-
-    FunctionBinaryArithmeticToNullType() {}
-    String get_name() const override { return name; }
-
-    size_t get_number_of_arguments() const override { return 2; }
-
-    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
-        DataTypePtr type_res;
-
-        const IDataType* first_type = arguments[0].get();
-        const IDataType* secord_type = arguments[1].get();
-        if (first_type->is_nullable()) {
-            first_type = static_cast<const DataTypeNullable*>(first_type)->get_nested_type().get();
-        }
-        if (secord_type->is_nullable()) {
-            secord_type =
-                    static_cast<const DataTypeNullable*>(secord_type)->get_nested_type().get();
-        }
-
-        bool valid =
-                cast_both_types(first_type, secord_type, [&](const auto& left, const auto& right) {
-                    using LeftDataType = std::decay_t<decltype(left)>;
-                    using RightDataType = std::decay_t<decltype(right)>;
-                    using ResultDataType =
-                            typename BinaryOperationTraits<Op, LeftDataType,
-                                                           RightDataType>::ResultDataType;
-                    if constexpr (!std::is_same_v<ResultDataType, InvalidType>) {
-                        if constexpr (IsDataTypeDecimal<LeftDataType> &&
-                                      IsDataTypeDecimal<RightDataType>) {
-                            constexpr bool is_multiply = false;
-                            constexpr bool is_division =
-                                    std::is_same_v<Op<UInt8, UInt8>,
-                                                   DivideFloatingImpl<UInt8, UInt8>> ||
-                                    std::is_same_v<Op<UInt8, UInt8>,
-                                                   DivideIntegralImpl<UInt8, UInt8>> ||
-                                    std::is_same_v<Op<UInt8, UInt8>,
-                                                   DivideIntegralOrZeroImpl<UInt8, UInt8>>;
-
-                            ResultDataType result_type =
-                                    decimal_result_type(left, right, is_multiply, is_division);
-                            type_res = std::make_shared<ResultDataType>(result_type.get_precision(),
-                                                                        result_type.get_scale());
-                        } else if constexpr (IsDataTypeDecimal<LeftDataType>)
-                            type_res = std::make_shared<LeftDataType>(left.get_precision(),
-                                                                      left.get_scale());
-                        else if constexpr (IsDataTypeDecimal<RightDataType>)
-                            type_res = std::make_shared<RightDataType>(right.get_precision(),
-                                                                       right.get_scale());
-                        else if constexpr (IsDataTypeDecimal<ResultDataType>)
-                            type_res = std::make_shared<ResultDataType>(27, 9);
-                        else
-                            type_res = std::make_shared<ResultDataType>();
-                        return true;
-                    }
-                    return false;
-                });
-        if (!valid) {
-            LOG(FATAL) << fmt::format("Illegal types {} and {} of arguments of function {}",
-                                      arguments[0]->get_name(), arguments[1]->get_name(),
-                                      get_name());
-        }
-
-        return make_nullable(type_res);
-    }
-
-    bool use_default_implementation_for_nulls() const override { return true; }
-
-    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
-                        size_t result, size_t input_rows_count) override {
-        auto* left_generic = block.get_by_position(arguments[0]).type.get();
-        auto* right_generic = block.get_by_position(arguments[1]).type.get();
-        if (left_generic->is_nullable()) {
-            left_generic =
-                    static_cast<const DataTypeNullable*>(left_generic)->get_nested_type().get();
-        }
-        if (right_generic->is_nullable()) {
-            right_generic =
-                    static_cast<const DataTypeNullable*>(right_generic)->get_nested_type().get();
-        }
-
-        bool valid = cast_both_types(
-                left_generic, right_generic, [&](const auto& left, const auto& right) {
-                    using LeftDataType = std::decay_t<decltype(left)>;
-                    using RightDataType = std::decay_t<decltype(right)>;
-                    using ResultDataType =
-                            typename BinaryOperationTraits<Op, LeftDataType,
-                                                           RightDataType>::ResultDataType;
-
-                    if constexpr (!std::is_same_v<ResultDataType, InvalidType>) {
-                        constexpr bool result_is_decimal =
-                                IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
-                        constexpr bool is_multiply = false;
-                        constexpr bool is_division =
-                                std::is_same_v<Op<UInt8, UInt8>,
-                                               DivideFloatingImpl<UInt8, UInt8>> ||
-                                std::is_same_v<Op<UInt8, UInt8>,
-                                               DivideIntegralImpl<UInt8, UInt8>> ||
-                                std::is_same_v<Op<UInt8, UInt8>,
-                                               DivideIntegralOrZeroImpl<UInt8, UInt8>>;
-
-                        using T0 = typename LeftDataType::FieldType;
-                        using T1 = typename RightDataType::FieldType;
-                        using ResultType = typename ResultDataType::FieldType;
-                        using ColVecT0 = std::conditional_t<IsDecimalNumber<T0>, ColumnDecimal<T0>,
-                                                            ColumnVector<T0>>;
-                        using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>,
-                                                            ColumnVector<T1>>;
-                        using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>,
-                                                                ColumnDecimal<ResultType>,
-                                                                ColumnVector<ResultType>>;
-
-                        /// Decimal operations need scale. Operations are on result type.
-                        using OpImpl = std::conditional_t<
-                                IsDataTypeDecimal<ResultDataType>,
-                                DecimalBinaryOperation<T0, T1, Op, ResultType>,
-                                BinaryOperationImpl<T0, T1, Op<T0, T1>, ResultType>>;
-
-                        auto null_map = ColumnUInt8::create(input_rows_count, 0);
-                        auto& null_map_data = null_map->get_data();
-                        size_t argument_size = arguments.size();
-                        ColumnPtr argument_columns[argument_size];
-
-                        for (size_t i = 0; i < argument_size; ++i) {
-                            argument_columns[i] =
-                                    block.get_by_position(arguments[i])
-                                            .column->convert_to_full_column_if_const();
-                        }
-
-                        auto col_left_raw = argument_columns[0].get();
-                        auto col_right_raw = argument_columns[1].get();
-
-                        typename ColVecResult::MutablePtr col_res = nullptr;
-                        if constexpr (result_is_decimal) {
-                            ResultDataType type =
-                                    decimal_result_type(left, right, is_multiply, is_division);
-                            col_res = ColVecResult::create(0, type.get_scale());
-                        } else {
-                            col_res = ColVecResult::create();
-                        }
-
-                        auto& vec_res = col_res->get_data();
-                        vec_res.resize(block.rows());
-
-                        if (auto col_left = check_and_get_column<ColVecT0>(col_left_raw)) {
-                            if constexpr (result_is_decimal) {
-                                ResultDataType type =
-                                        decimal_result_type(left, right, is_multiply, is_division);
-
-                                typename ResultDataType::FieldType scale_a =
-                                        type.scale_factor_for(left, is_multiply);
-                                typename ResultDataType::FieldType scale_b =
-                                        type.scale_factor_for(right, is_multiply || is_division);
-                                if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
-                                    scale_a = right.get_scale_multiplier();
-                                if (auto col_right =
-                                            check_and_get_column<ColVecT1>(col_right_raw)) {
-                                    OpImpl::vector_vector(col_left->get_data(),
-                                                          col_right->get_data(), vec_res, scale_a,
-                                                          scale_b, check_decimal_overflow,
-                                                          null_map_data);
-                                }
-                            } else {
-                                if (auto col_right =
-                                            check_and_get_column<ColVecT1>(col_right_raw)) {
-                                    OpImpl::vector_vector(col_left->get_data(),
-                                                          col_right->get_data(), vec_res,
-                                                          null_map_data);
-                                }
-                            }
-                        } else {
-                            return false;
-                        }
-
-                        block.get_by_position(result).column =
-                                ColumnNullable::create(std::move(col_res), std::move(null_map));
-                        return true;
-                    } else {
-                        return false;
-                    }
-                });
-
-        if (!valid) {
-            return Status::RuntimeError(
-                    fmt::format("{}'s arguments do not match the expected data types", get_name()));
-        }
-
-        return Status::OK();
-    }
-
-    bool can_be_executed_on_default_arguments() const override {
-        return CanBeExecutedOnDefaultArguments;
-    }
-};
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_bit.cpp b/be/src/vec/functions/function_bit.cpp
index 0f4fb87d27..7af28601a0 100644
--- a/be/src/vec/functions/function_bit.cpp
+++ b/be/src/vec/functions/function_bit.cpp
@@ -20,9 +20,9 @@
 
 #include "vec/data_types/number_traits.h"
 #include "vec/functions/function_binary_arithmetic.h"
+#include "vec/functions/function_totype.h"
 #include "vec/functions/function_unary_arithmetic.h"
 #include "vec/functions/simple_function_factory.h"
-#include "vec/functions/function_totype.h"
 
 namespace doris::vectorized {
 
@@ -101,10 +101,10 @@ struct BitLengthImpl {
     }
 };
 
-using FunctionBitAnd = FunctionBinaryArithmetic<BitAndImpl, NameBitAnd>;
+using FunctionBitAnd = FunctionBinaryArithmetic<BitAndImpl, NameBitAnd, false>;
 using FunctionBitNot = FunctionUnaryArithmetic<BitNotImpl, NameBitNot, false>;
-using FunctionBitOr = FunctionBinaryArithmetic<BitOrImpl, NameBitOr>;
-using FunctionBitXor = FunctionBinaryArithmetic<BitXorImpl, NameBitXor>;
+using FunctionBitOr = FunctionBinaryArithmetic<BitOrImpl, NameBitOr, false>;
+using FunctionBitXor = FunctionBinaryArithmetic<BitXorImpl, NameBitXor, false>;
 using FunctionBitLength = FunctionUnaryToType<BitLengthImpl, NameBitLength>;
 
 void register_function_bit(SimpleFunctionFactory& factory) {
diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h
index 144b782554..092842f5fa 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -518,7 +518,6 @@ public:
 
     bool use_default_implementation_for_constants() const override { return true; }
     ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; }
-    bool can_be_executed_on_default_arguments() const override { return false; }
 
     Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
                         size_t result, size_t input_rows_count) override {
diff --git a/be/src/vec/functions/int_div.cpp b/be/src/vec/functions/int_div.cpp
index 6dd50d44f8..59b2d1c2dc 100644
--- a/be/src/vec/functions/int_div.cpp
+++ b/be/src/vec/functions/int_div.cpp
@@ -18,133 +18,17 @@
 // https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/IntDiv.cpp
 // and modified by Doris
 
-#ifdef __SSE2__
-#define LIBDIVIDE_SSE2 1
-#endif
-
 #include "vec/functions/int_div.h"
 
-#include <libdivide.h>
-
 #include "vec/functions/function_binary_arithmetic.h"
-#include "vec/functions/function_binary_arithmetic_to_null_type.h"
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
 
-/// Optimizations for integer division by a constant.
-
-template <typename A, typename B>
-struct DivideIntegralByConstantImpl : BinaryOperationImplBase<A, B, DivideIntegralImpl<A, B>> {
-    using ResultType = typename DivideIntegralImpl<A, B>::ResultType;
-
-    static void vector_constant(const PaddedPODArray<A>& a, B b, PaddedPODArray<ResultType>& c) {
-        // TODO: Support return null in the furture
-        if (UNLIKELY(b == 0)) {
-            //            throw Exception("Division by zero", TStatusCode::VEC_ILLEGAL_DIVISION);
-            memset(c.data(), 0, sizeof(ResultType) * c.size());
-            return;
-        }
-
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-
-        if (UNLIKELY(std::is_signed_v<B> && b == -1)) {
-            size_t size = a.size();
-            for (size_t i = 0; i < size; ++i) c[i] = -c[i];
-            return;
-        }
-
-#pragma GCC diagnostic pop
-
-        libdivide::divider<A> divider(b);
-
-        size_t size = a.size();
-        const A* a_pos = a.data();
-        const A* a_end = a_pos + size;
-        ResultType* c_pos = c.data();
-
-#ifdef __SSE2__
-        static constexpr size_t values_per_sse_register = 16 / sizeof(A);
-        const A* a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register;
-
-        while (a_pos < a_end_sse) {
-            _mm_storeu_si128(reinterpret_cast<__m128i*>(c_pos),
-                             _mm_loadu_si128(reinterpret_cast<const __m128i*>(a_pos)) / divider);
-
-            a_pos += values_per_sse_register;
-            c_pos += values_per_sse_register;
-        }
-#endif
-
-        while (a_pos < a_end) {
-            *c_pos = *a_pos / divider;
-            ++a_pos;
-            ++c_pos;
-        }
-    }
-};
-
-/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
-  * Can be expanded to all possible combinations, but more code is needed.
-  */
-
-template <>
-struct BinaryOperationImpl<UInt64, UInt8, DivideIntegralImpl<UInt64, UInt8>>
-        : DivideIntegralByConstantImpl<UInt64, UInt8> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt16, DivideIntegralImpl<UInt64, UInt16>>
-        : DivideIntegralByConstantImpl<UInt64, UInt16> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt32, DivideIntegralImpl<UInt64, UInt32>>
-        : DivideIntegralByConstantImpl<UInt64, UInt32> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt64, DivideIntegralImpl<UInt64, UInt64>>
-        : DivideIntegralByConstantImpl<UInt64, UInt64> {};
-
-template <>
-struct BinaryOperationImpl<UInt32, UInt8, DivideIntegralImpl<UInt32, UInt8>>
-        : DivideIntegralByConstantImpl<UInt32, UInt8> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt16, DivideIntegralImpl<UInt32, UInt16>>
-        : DivideIntegralByConstantImpl<UInt32, UInt16> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt32, DivideIntegralImpl<UInt32, UInt32>>
-        : DivideIntegralByConstantImpl<UInt32, UInt32> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt64, DivideIntegralImpl<UInt32, UInt64>>
-        : DivideIntegralByConstantImpl<UInt32, UInt64> {};
-
-template <>
-struct BinaryOperationImpl<Int64, Int8, DivideIntegralImpl<Int64, Int8>>
-        : DivideIntegralByConstantImpl<Int64, Int8> {};
-template <>
-struct BinaryOperationImpl<Int64, Int16, DivideIntegralImpl<Int64, Int16>>
-        : DivideIntegralByConstantImpl<Int64, Int16> {};
-template <>
-struct BinaryOperationImpl<Int64, Int32, DivideIntegralImpl<Int64, Int32>>
-        : DivideIntegralByConstantImpl<Int64, Int32> {};
-template <>
-struct BinaryOperationImpl<Int64, Int64, DivideIntegralImpl<Int64, Int64>>
-        : DivideIntegralByConstantImpl<Int64, Int64> {};
-
-template <>
-struct BinaryOperationImpl<Int32, Int8, DivideIntegralImpl<Int32, Int8>>
-        : DivideIntegralByConstantImpl<Int32, Int8> {};
-template <>
-struct BinaryOperationImpl<Int32, Int16, DivideIntegralImpl<Int32, Int16>>
-        : DivideIntegralByConstantImpl<Int32, Int16> {};
-template <>
-struct BinaryOperationImpl<Int32, Int32, DivideIntegralImpl<Int32, Int32>>
-        : DivideIntegralByConstantImpl<Int32, Int32> {};
-template <>
-struct BinaryOperationImpl<Int32, Int64, DivideIntegralImpl<Int32, Int64>>
-        : DivideIntegralByConstantImpl<Int32, Int64> {};
-
 struct NameIntDiv {
     static constexpr auto name = "int_divide";
 };
-using FunctionIntDiv = FunctionBinaryArithmeticToNullType<DivideIntegralImpl, NameIntDiv, false>;
+using FunctionIntDiv = FunctionBinaryArithmetic<DivideIntegralImpl, NameIntDiv, true>;
 
 void register_function_int_div(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionIntDiv>();
diff --git a/be/src/vec/functions/int_div.h b/be/src/vec/functions/int_div.h
index 1bfebcb538..2357eb5af1 100644
--- a/be/src/vec/functions/int_div.h
+++ b/be/src/vec/functions/int_div.h
@@ -20,19 +20,49 @@
 
 #pragma once
 
+#include <libdivide.h>
+
+#include <type_traits>
+
+#include "vec/columns/column_decimal.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/core/types.h"
 #include "vec/data_types/number_traits.h"
+#include "vec/functions/function_binary_arithmetic.h"
 
 namespace doris::vectorized {
 
 template <typename A, typename B>
 struct DivideIntegralImpl {
     using ResultType = typename NumberTraits::ResultOfIntegerDivision<A, B>::Type;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+
+    template <typename Result = ResultType>
+    static void apply(const typename Traits::ArrayA& a, B b,
+                      typename ColumnVector<Result>::Container& c,
+                      typename Traits::ArrayNull& null_map) {
+        size_t size = c.size();
+        UInt8 is_null = b == 0;
+        memset(null_map.data(), is_null, size);
+
+        if (!is_null) {
+            if constexpr (!std::is_floating_point_v<A> && !std::is_same_v<A, Int128> &&
+                          !std::is_same_v<A, Int8> && !std::is_same_v<A, UInt8>) {
+                for (size_t i = 0; i < size; i++) {
+                    c[i] = a[i] / libdivide::divider<A>(b);
+                }
+            } else {
+                for (size_t i = 0; i < size; i++) {
+                    c[i] = a[i] / b;
+                }
+            }
+        }
+    }
 
     template <typename Result = ResultType>
-    static inline Result apply(A a, B b, NullMap& null_map, size_t index) {
-        null_map[index] = b == 0;
-        return a / (b + null_map[index]);
+    static inline Result apply(A a, B b, UInt8& is_null) {
+        is_null = b == 0;
+        return a / (b + is_null);
     }
 };
 
diff --git a/be/src/vec/functions/math.cpp b/be/src/vec/functions/math.cpp
index 1aedd83187..1e6952e1f6 100644
--- a/be/src/vec/functions/math.cpp
+++ b/be/src/vec/functions/math.cpp
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "vec/core/types.h"
 #include "vec/data_types/number_traits.h"
 #include "vec/functions/function_const.h"
 #include "vec/functions/function_binary_arithmetic.h"
-#include "vec/functions/function_binary_arithmetic_to_null_type.h"
+#include "vec/functions/function_binary_arithmetic.h"
 #include "vec/functions/function_math_unary.h"
 #include "vec/functions/function_math_unary_to_null_type.h"
 #include "vec/functions/function_string.h"
@@ -150,17 +151,39 @@ struct LogName {
 template <typename A, typename B>
 struct LogImpl {
     using ResultType = Float64;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+
     static const constexpr bool allow_decimal = false;
+    static constexpr double EPSILON = 1e-9;
+
+    template <typename Result = ResultType>
+    static void apply(const typename Traits::ArrayA& a, B b,
+                      typename ColumnVector<Result>::Container& c,
+                      typename Traits::ArrayNull& null_map) {
+        size_t size = c.size();
+        UInt8 is_null = b <= 0;
+        memset(null_map.data(), is_null, size);
+
+        if (!is_null) {
+            for (size_t i = 0; i < size; i++) {
+                if (a[i] <= 0 || std::fabs(a[i] - 1.0) < EPSILON) {
+                    null_map[i] = 1;
+                } else {
+                    c[i] = static_cast<Float64>(std::log(static_cast<Float64>(b)) /
+                                                std::log(static_cast<Float64>(a[i])));
+                }
+            }
+        }
+    }
 
     template <typename Result>
-    static inline Result apply(A a, B b, NullMap& null_map, size_t index) {
-        constexpr double EPSILON = 1e-9;
-        null_map[index] = a <= 0 || b <= 0 || std::fabs(a - 1.0) < EPSILON;
+    static inline Result apply(A a, B b, UInt8& is_null) {
+        is_null = a <= 0 || b <= 0 || std::fabs(a - 1.0) < EPSILON;
         return static_cast<Float64>(std::log(static_cast<Float64>(b)) /
                                     std::log(static_cast<Float64>(a)));
     }
 };
-using FunctionLog = FunctionBinaryArithmeticToNullType<LogImpl, LogName>;
+using FunctionLog = FunctionBinaryArithmetic<LogImpl, LogName, true>;
 
 struct CeilName {
     static constexpr auto name = "ceil";
@@ -357,7 +380,7 @@ struct PowImpl {
 struct PowName {
     static constexpr auto name = "pow";
 };
-using FunctionPow = FunctionBinaryArithmetic<PowImpl, PowName>;
+using FunctionPow = FunctionBinaryArithmetic<PowImpl, PowName, false>;
 
 template <typename A, typename B>
 struct TruncateImpl {
@@ -374,7 +397,7 @@ struct TruncateImpl {
 struct TruncateName {
     static constexpr auto name = "truncate";
 };
-using FunctionTruncate = FunctionBinaryArithmetic<TruncateImpl, TruncateName>;
+using FunctionTruncate = FunctionBinaryArithmetic<TruncateImpl, TruncateName, false>;
 
 /// round(double,int32)-->double
 /// key_str:roundFloat64Int32
@@ -395,7 +418,7 @@ struct RoundTwoImpl {
                 my_double_round(static_cast<Float64>(a), static_cast<Int32>(b), false, false));
     }
 };
-using FunctionRoundTwo = FunctionBinaryArithmetic<RoundTwoImpl, RoundName>;
+using FunctionRoundTwo = FunctionBinaryArithmetic<RoundTwoImpl, RoundName, false>;
 
 // TODO: Now math may cause one thread compile time too long, because the function in math
 // so mush. Split it to speed up compile time in the future
diff --git a/be/src/vec/functions/minus.cpp b/be/src/vec/functions/minus.cpp
index 52b86a1085..e282935729 100644
--- a/be/src/vec/functions/minus.cpp
+++ b/be/src/vec/functions/minus.cpp
@@ -49,7 +49,7 @@ struct MinusImpl {
 struct NameMinus {
     static constexpr auto name = "subtract";
 };
-using FunctionMinus = FunctionBinaryArithmetic<MinusImpl, NameMinus>;
+using FunctionMinus = FunctionBinaryArithmetic<MinusImpl, NameMinus, false>;
 
 void register_function_minus(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionMinus>();
diff --git a/be/src/vec/functions/modulo.cpp b/be/src/vec/functions/modulo.cpp
index 28b0ec768d..599790d0d4 100644
--- a/be/src/vec/functions/modulo.cpp
+++ b/be/src/vec/functions/modulo.cpp
@@ -18,16 +18,12 @@
 // https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/Modulo.cpp
 // and modified by Doris
 
-#include "runtime/decimalv2_value.h"
-#ifdef __SSE2__
-#define LIBDIVIDE_SSE2 1
-#endif
-
 #include <libdivide.h>
 
 #include "common/status.h"
+#include "runtime/decimalv2_value.h"
+#include "vec/core/types.h"
 #include "vec/functions/function_binary_arithmetic.h"
-#include "vec/functions/function_binary_arithmetic_to_null_type.h"
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
@@ -35,144 +31,90 @@ namespace doris::vectorized {
 template <typename A, typename B>
 struct ModuloImpl {
     using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+
+    template <typename Result = ResultType>
+    static void apply(const typename Traits::ArrayA& a, B b,
+                      typename ColumnVector<Result>::Container& c,
+                      typename Traits::ArrayNull& null_map) {
+        size_t size = c.size();
+        UInt8 is_null = b == 0;
+        memset(null_map.data(), is_null, sizeof(UInt8) * size);
+
+        if (!is_null) {
+            for (size_t i = 0; i < size; i++) {
+                if constexpr (std::is_floating_point_v<ResultType>) {
+                    c[i] = std::fmod((double)a[i], (double)b);
+                } else {
+                    c[i] = a[i] % b;
+                }
+            }
+        }
+    }
 
     template <typename Result = ResultType>
-    static inline Result apply(A a, B b, NullMap& null_map, size_t index) {
+    static inline Result apply(A a, B b, UInt8& is_null) {
+        is_null = b == 0;
+        b += is_null;
+
         if constexpr (std::is_floating_point_v<Result>) {
-            null_map[index] = b == 0;
             return std::fmod((double)a, (double)b);
         } else {
-            null_map[index] = b == 0;
-            return typename NumberTraits::ToInteger<A>::Type(a) %
-                   (typename NumberTraits::ToInteger<B>::Type(b) + (b == 0));
+            return a % b;
         }
     }
 
     template <typename Result = DecimalV2Value>
-    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, NullMap& null_map,
-                                       size_t index) {
-        null_map[index] = b == DecimalV2Value(0);
-        return a % (b + DecimalV2Value(b == DecimalV2Value(0)));
+    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, UInt8& is_null) {
+        is_null = b == DecimalV2Value(0);
+        return a % (b + DecimalV2Value(is_null));
     }
 };
 
 template <typename A, typename B>
 struct PModuloImpl {
     using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
+    using Traits = NumberTraits::BinaryOperatorTraits<A, B>;
+
+    template <typename Result = ResultType>
+    static void apply(const typename Traits::ArrayA& a, B b,
+                      typename ColumnVector<Result>::Container& c,
+                      typename Traits::ArrayNull& null_map) {
+        size_t size = c.size();
+        UInt8 is_null = b == 0;
+        memset(null_map.data(), is_null, size);
+
+        if (!is_null) {
+            for (size_t i = 0; i < size; i++) {
+                if constexpr (std::is_floating_point_v<ResultType>) {
+                    c[i] = std::fmod(std::fmod((double)a[i], (double)b) + (double)b, double(b));
+                } else {
+                    c[i] = (a[i] % b + b) % b;
+                }
+            }
+        }
+    }
 
     template <typename Result = ResultType>
-    static inline Result apply(A a, B b, NullMap& null_map, size_t index) {
+    static inline Result apply(A a, B b, UInt8& is_null) {
+        is_null = b == 0;
+        b += is_null;
+
         if constexpr (std::is_floating_point_v<Result>) {
-            null_map[index] = 0;
             return std::fmod(std::fmod((double)a, (double)b) + (double)b, (double)b);
         } else {
-            null_map[index] = b == 0;
-            return (typename NumberTraits::ToInteger<A>::Type(a) %
-                            (typename NumberTraits::ToInteger<B>::Type(b) + (b == 0)) +
-                    typename NumberTraits::ToInteger<B>::Type(b)) %
-                   (typename NumberTraits::ToInteger<B>::Type(b) + (b == 0));
+            return (a % b + b) % b;
         }
     }
 
     template <typename Result = DecimalV2Value>
-    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, NullMap& null_map,
-                                       size_t index) {
-        null_map[index] = b == DecimalV2Value(0);
-        return (a % (b + DecimalV2Value(b == DecimalV2Value(0))) + b) %
-               (b + DecimalV2Value(b == DecimalV2Value(0)));
+    static inline DecimalV2Value apply(DecimalV2Value a, DecimalV2Value b, UInt8& is_null) {
+        is_null = b == DecimalV2Value(0);
+        b += DecimalV2Value(is_null);
+        return (a % b + b) % b;
     }
 };
 
-template <typename A, typename B>
-struct ModuloByConstantImpl : BinaryOperationImplBase<A, B, ModuloImpl<A, B>> {
-    using ResultType = typename ModuloImpl<A, B>::ResultType;
-
-    static void vector_constant(const PaddedPODArray<A>& a, B b, PaddedPODArray<ResultType>& c) {
-        // TODO: Support return NULL in the future
-        if (UNLIKELY(b == 0)) {
-            //        throw Exception("Division by zero", TStatusCode::VEC_ILLEGAL_DIVISION);
-            memset(c.data(), 0, sizeof(ResultType) * c.size());
-            return;
-        }
-
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-
-        if (UNLIKELY((std::is_signed_v<B> && b == -1) || b == 1)) {
-            size_t size = a.size();
-            for (size_t i = 0; i < size; ++i) c[i] = 0;
-            return;
-        }
-
-#pragma GCC diagnostic pop
-
-        libdivide::divider<A> divider(b);
-
-        /// Here we failed to make the SSE variant from libdivide give an advantage.
-        size_t size = a.size();
-        for (size_t i = 0; i < size; ++i)
-            c[i] = a[i] -
-                   (a[i] / divider) *
-                           b; /// NOTE: perhaps, the division semantics with the remainder of negative numbers is not preserved.
-    }
-};
-
-/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
-  * Can be expanded to all possible combinations, but more code is needed.
-  */
-
-template <>
-struct BinaryOperationImpl<UInt64, UInt8, ModuloImpl<UInt64, UInt8>>
-        : ModuloByConstantImpl<UInt64, UInt8> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt16, ModuloImpl<UInt64, UInt16>>
-        : ModuloByConstantImpl<UInt64, UInt16> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt32, ModuloImpl<UInt64, UInt32>>
-        : ModuloByConstantImpl<UInt64, UInt32> {};
-template <>
-struct BinaryOperationImpl<UInt64, UInt64, ModuloImpl<UInt64, UInt64>>
-        : ModuloByConstantImpl<UInt64, UInt64> {};
-
-template <>
-struct BinaryOperationImpl<UInt32, UInt8, ModuloImpl<UInt32, UInt8>>
-        : ModuloByConstantImpl<UInt32, UInt8> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt16, ModuloImpl<UInt32, UInt16>>
-        : ModuloByConstantImpl<UInt32, UInt16> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt32, ModuloImpl<UInt32, UInt32>>
-        : ModuloByConstantImpl<UInt32, UInt32> {};
-template <>
-struct BinaryOperationImpl<UInt32, UInt64, ModuloImpl<UInt32, UInt64>>
-        : ModuloByConstantImpl<UInt32, UInt64> {};
-
-template <>
-struct BinaryOperationImpl<Int64, Int8, ModuloImpl<Int64, Int8>>
-        : ModuloByConstantImpl<Int64, Int8> {};
-template <>
-struct BinaryOperationImpl<Int64, Int16, ModuloImpl<Int64, Int16>>
-        : ModuloByConstantImpl<Int64, Int16> {};
-template <>
-struct BinaryOperationImpl<Int64, Int32, ModuloImpl<Int64, Int32>>
-        : ModuloByConstantImpl<Int64, Int32> {};
-template <>
-struct BinaryOperationImpl<Int64, Int64, ModuloImpl<Int64, Int64>>
-        : ModuloByConstantImpl<Int64, Int64> {};
-
-template <>
-struct BinaryOperationImpl<Int32, Int8, ModuloImpl<Int32, Int8>>
-        : ModuloByConstantImpl<Int32, Int8> {};
-template <>
-struct BinaryOperationImpl<Int32, Int16, ModuloImpl<Int32, Int16>>
-        : ModuloByConstantImpl<Int32, Int16> {};
-template <>
-struct BinaryOperationImpl<Int32, Int32, ModuloImpl<Int32, Int32>>
-        : ModuloByConstantImpl<Int32, Int32> {};
-template <>
-struct BinaryOperationImpl<Int32, Int64, ModuloImpl<Int32, Int64>>
-        : ModuloByConstantImpl<Int32, Int64> {};
-
 struct NameModulo {
     static constexpr auto name = "mod";
 };
@@ -180,8 +122,8 @@ struct NamePModulo {
     static constexpr auto name = "pmod";
 };
 
-using FunctionModulo = FunctionBinaryArithmeticToNullType<ModuloImpl, NameModulo, false>;
-using FunctionPModulo = FunctionBinaryArithmeticToNullType<PModuloImpl, NamePModulo, false>;
+using FunctionModulo = FunctionBinaryArithmetic<ModuloImpl, NameModulo, true>;
+using FunctionPModulo = FunctionBinaryArithmetic<PModuloImpl, NamePModulo, true>;
 
 void register_function_modulo(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionModulo>();
diff --git a/be/src/vec/functions/multiply.cpp b/be/src/vec/functions/multiply.cpp
index b2840e42e5..95475ed847 100644
--- a/be/src/vec/functions/multiply.cpp
+++ b/be/src/vec/functions/multiply.cpp
@@ -48,7 +48,7 @@ struct MultiplyImpl {
 struct NameMultiply {
     static constexpr auto name = "multiply";
 };
-using FunctionMultiply = FunctionBinaryArithmetic<MultiplyImpl, NameMultiply>;
+using FunctionMultiply = FunctionBinaryArithmetic<MultiplyImpl, NameMultiply, false>;
 
 void register_function_multiply(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionMultiply>();
diff --git a/be/src/vec/functions/plus.cpp b/be/src/vec/functions/plus.cpp
index 0da30dfa20..d573d3a3d6 100644
--- a/be/src/vec/functions/plus.cpp
+++ b/be/src/vec/functions/plus.cpp
@@ -50,7 +50,7 @@ struct PlusImpl {
 struct NamePlus {
     static constexpr auto name = "add";
 };
-using FunctionPlus = FunctionBinaryArithmetic<PlusImpl, NamePlus>;
+using FunctionPlus = FunctionBinaryArithmetic<PlusImpl, NamePlus, false>;
 
 void register_function_plus(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionPlus>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org