You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/08/19 02:29:27 UTC

[incubator-doris] branch master updated: Support udaf_orthogonal_bitmap (#4198)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f924282  Support udaf_orthogonal_bitmap (#4198)
f924282 is described below

commit f92428248f91a191294bc2a8a2deb1c209250acc
Author: zhbinbin <16...@users.noreply.github.com>
AuthorDate: Wed Aug 19 10:29:13 2020 +0800

    Support udaf_orthogonal_bitmap (#4198)
    
    The original Doris bitmap aggregation function has poor performance on the intersection and union set of bitmap cardinality of more than one billion. There are two reasons for this. The first is that when the bitmap cardinality is large, if the data size exceeds 1g, the network / disk IO time consumption will increase; The second point is that all the sink data of the back-end be instance are transferred to the top node for intersection and union calculation, which leads to the pressu [...]
    
    My solution is to create a fixed schema table based on the Doris fragmentation rule, and hash fragment the ID range based on the bitmap, that is, cut the ID range vertically to form a small cube. Such bitmap blocks will become smaller and evenly distributed on all back-end be instances. Based on the schema table, some new high-performance udaf aggregation functions are developed. All Scan nodes participate in intersection and union calculation, and top nodes only summarize
    
    The design goal is that the base number of bitmap is more than 10 billion, and the response time of cross union set calculation of 100 dimensional granularity is within 5 s.
    
    There are three udaf functions in this commit: orthogonal_bitmap_intersect_count, orthogonal_bitmap_union_count, orthogonal_bitmap_intersect.
---
 contrib/udf/CMakeLists.txt                         |    1 +
 .../udf/src/udaf_orthogonal_bitmap/CMakeLists.txt  |   92 ++
 .../udf/src/udaf_orthogonal_bitmap/bitmap_value.h  | 1326 ++++++++++++++++++++
 .../orthogonal_bitmap_function.cpp                 |  492 ++++++++
 .../orthogonal_bitmap_function.h                   |   62 +
 .../udf/src/udaf_orthogonal_bitmap/string_value.h  |  175 +++
 docs/.vuepress/sidebar/en.js                       |    4 +-
 docs/.vuepress/sidebar/zh-CN.js                    |    4 +-
 .../udf/contrib/udaf-orthogonal-bitmap-manual.md   |  249 ++++
 .../udf/contrib/udaf-orthogonal-bitmap-manual.md   |  238 ++++
 10 files changed, 2641 insertions(+), 2 deletions(-)

diff --git a/contrib/udf/CMakeLists.txt b/contrib/udf/CMakeLists.txt
index e0feef1..8554516 100644
--- a/contrib/udf/CMakeLists.txt
+++ b/contrib/udf/CMakeLists.txt
@@ -72,5 +72,6 @@ set_target_properties(udf PROPERTIES IMPORTED_LOCATION $ENV{DORIS_HOME}/output/u
 
 # Add the subdirector of new UDF in here
 add_subdirectory(${SRC_DIR}/udf_samples)
+add_subdirectory(${SRC_DIR}/udaf_orthogonal_bitmap)
 
 install(DIRECTORY DESTINATION ${OUTPUT_DIR})
diff --git a/contrib/udf/src/udaf_orthogonal_bitmap/CMakeLists.txt b/contrib/udf/src/udaf_orthogonal_bitmap/CMakeLists.txt
new file mode 100644
index 0000000..5741509
--- /dev/null
+++ b/contrib/udf/src/udaf_orthogonal_bitmap/CMakeLists.txt
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/udaf_orthogonal_bitmap")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/udaf_orthogonal_bitmap")
+
+
+# set CMAKE_BUILD_TARGET_ARCH                                                        
+# use `lscpu | grep 'Architecture' | awk '{print $2}'` only support system which language is en_US.UTF-8
+execute_process(COMMAND bash "-c" "uname -m"
+                OUTPUT_VARIABLE
+                CMAKE_BUILD_TARGET_ARCH
+                OUTPUT_STRIP_TRAILING_WHITESPACE)
+message(STATUS "Build target arch is ${CMAKE_BUILD_TARGET_ARCH}")
+
+# Set dirs
+set(SRC_DIR "$ENV{DORIS_HOME}/be/src/")
+set(THIRDPARTY_DIR "$ENV{DORIS_THIRDPARTY}/installed/")
+
+# Set include dirs
+include_directories(./)
+include_directories(${THIRDPARTY_DIR}/include/)
+
+# message
+message(STATUS "base dir is ${BASE_DIR}")
+message(STATUS "doris home dir is $ENV{DORIS_HOME}")
+message(STATUS "src dir is ${SRC_DIR}")
+message(STATUS "libroaring dir is ${THIRDPARTY_DIR}/lib/libroaring.a")
+message(STATUS "thirdparty dir is $ENV{DORIS_THIRDPARTY}")
+
+# Set all libraries
+add_library(roaring STATIC IMPORTED)
+set_target_properties(roaring PROPERTIES IMPORTED_LOCATION
+        ${THIRDPARTY_DIR}/lib/libroaring.a)
+
+# Set FLAGS
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall -Wno-sign-compare -Wno-unknown-pragmas -pthread")
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fno-strict-aliasing -fno-omit-frame-pointer")
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -std=gnu++11 -D__STDC_FORMAT_MACROS")
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated -Wno-vla")
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-function")
+if ("${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86" OR "${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86_64")
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse4.2")
+endif()
+set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS}  -Wno-attributes -DS2_USE_GFLAGS -DS2_USE_GLOG")
+
+# For any gcc builds:
+#   -g: Enable symbols for profiler tools
+#   -Wno-unused-local-typedefs: Do not warn for local typedefs that are unused.
+set(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs -O0 -gdwarf-2 -DNDEBUG")
+
+SET(CMAKE_CXX_FLAGS ${CXX_GCC_FLAGS})
+
+SET(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
+
+message(STATUS "Compiler Flags: ${CMAKE_CXX_FLAGS}")
+
+# static link gcc's lib
+set(LINK_LIBS
+  -Wl,--whole-archive
+  roaring
+  udf
+  -Wl,--no-whole-archive
+  -static-libstdc++
+  -static-libgcc
+)
+
+set(DIR_SRCS 
+   ./orthogonal_bitmap_function.cpp
+)
+
+add_library(udaf_orthogonal_bitmap SHARED ${DIR_SRCS})
+target_link_libraries(udaf_orthogonal_bitmap
+  ${LINK_LIBS}
+)
diff --git a/contrib/udf/src/udaf_orthogonal_bitmap/bitmap_value.h b/contrib/udf/src/udaf_orthogonal_bitmap/bitmap_value.h
new file mode 100644
index 0000000..4264069
--- /dev/null
+++ b/contrib/udf/src/udaf_orthogonal_bitmap/bitmap_value.h
@@ -0,0 +1,1326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_CONTRIB_UDF_SRC_UDAF_BITMAP_BITMAP_VALUE_H
+#define DORIS_CONTRIB_UDF_SRC_UDAF_BITMAP_BITMAP_VALUE_H
+
+#include <algorithm>
+#include <cstdarg>
+#include <cstdio>
+#include <limits>
+#include <map>
+#include <sstream>
+#include <vector>
+#include <new>
+#include <numeric>
+#include <roaring/roaring.hh>
+#include <stdexcept>
+#include <string>
+#include <utility>
+
+namespace doris_udf {
+
+inline void encode_fixed32_le(uint8_t* buf, uint32_t val) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    memcpy(buf, &val, sizeof(val));
+#else
+    uint32_t res = bswap_32(val);
+    memcpy(buf, &res, sizeof(res));
+#endif
+}
+
+inline void encode_fixed64_le(uint8_t* buf, uint64_t val) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    memcpy(buf, &val, sizeof(val));
+#else
+    uint64_t res = gbswap_64(val);
+    memcpy(buf, &res, sizeof(res));
+#endif
+}
+
+inline uint64_t decode_fixed64_le(const uint8_t* buf) {
+    uint64_t res;
+    memcpy(&res, buf, sizeof(res));
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    return res;
+#else
+    return gbswap_64(res);
+#endif
+}
+
+inline uint8_t* encode_varint64(uint8_t* dst, uint64_t v) {
+    static const unsigned int B = 128;
+    while (v >= B) {
+        *(dst++) = (v & (B - 1)) | B;
+        v >>= 7;
+    }
+    *(dst++) = static_cast<unsigned char>(v);
+    return dst;
+}
+
+inline  const uint8_t* decode_varint64_ptr(const uint8_t* p, const uint8_t* limit, uint64_t* value) {
+    uint64_t result = 0;
+    for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
+        uint64_t byte = *p;
+        p++;
+        if (byte & 128) {
+            // More bytes are present
+            result |= ((byte & 127) << shift);
+        } else {
+            result |= (byte << shift);
+            *value = result;
+            return p;
+        }
+    }
+    return nullptr;
+}
+
+inline uint32_t decode_fixed32_le(const uint8_t* buf) {
+    uint32_t res;
+    memcpy(&res, buf, sizeof(res));
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    return res;
+#else
+    return bswap_32(res);
+#endif
+}
+
+// Returns the length of the varint32 or varint64 encoding of "v"
+inline int varint_length(uint64_t v) {
+    int len = 1;
+    while (v >= 128) {
+        v >>= 7;
+        len++;
+    }
+    return len;
+}
+
+
+// serialized bitmap := TypeCode(1), Payload
+// The format of payload depends on value of TypeCode which is defined below
+struct BitmapTypeCode {
+    enum type {
+        // An empty bitmap. Payload is 0 byte.
+        // added in 0.11
+        EMPTY = 0,
+        // A bitmap containing only one element that is in [0, UINT32_MAX]
+        // Payload := UInt32LittleEndian(4 byte)
+        // added in 0.11
+        SINGLE32 = 1,
+        // A bitmap whose maximum element is in [0, UINT32_MAX]
+        // Payload := the standard RoaringBitmap format described by
+        // https://github.com/RoaringBitmap/RoaringFormatSpec/
+        // added in 0.11
+        BITMAP32 = 2,
+        // A bitmap containing only one element that is in (UINT32_MAX, UINT64_MAX]
+        // Payload := UInt64LittleEndian(8 byte)
+        // added in 0.12
+        SINGLE64 = 3,
+        // A bitmap whose maximum element is in (UINT32_MAX, UINT64_MAX].
+        //
+        // To support 64-bits elements, all elements with the same high 32 bits are stored in a
+        // RoaringBitmap containing only the lower 32 bits. Thus we could use
+        // map<uint32_t, RoaringBitmap> to represent bitmap of 64-bits ints.
+        //
+        // Since there is no standard format for 64-bits RoaringBitmap, we define our own as below
+        // Payload := NumRoaring(vint64), { MapKey, MapValue }^NumRoaring
+        // - MapKey := the shared high 32 bits in UInt32LittleEndian(4 byte)
+        // - MapValue := the standard RoaringBitmap format
+        //
+        // added in 0.12
+        BITMAP64 = 4
+    };
+};
+
+namespace detail {
+
+class Roaring64MapSetBitForwardIterator;
+
+// Forked from https://github.com/RoaringBitmap/CRoaring/blob/v0.2.60/cpp/roaring64map.hh
+// What we change includes
+// - a custom serialization format is used inside read()/write()/getSizeInBytes()
+// - added clear() and is32BitsEnough()
+class Roaring64Map {
+public:
+    /**
+     * Create an empty bitmap
+     */
+    Roaring64Map() = default;
+
+    /**
+     * Construct a bitmap from a list of 32-bit integer values.
+     */
+    Roaring64Map(size_t n, const uint32_t* data) { addMany(n, data); }
+
+    /**
+     * Construct a bitmap from a list of 64-bit integer values.
+     */
+    Roaring64Map(size_t n, const uint64_t* data) { addMany(n, data); }
+
+    /**
+     * Construct a 64-bit map from a 32-bit one
+     */
+    Roaring64Map(const Roaring& r) { emplaceOrInsert(0, r); }
+
+    /**
+     * Construct a roaring object from the C struct.
+     *
+     * Passing a NULL point is unsafe.
+     */
+    Roaring64Map(roaring_bitmap_t* s) { emplaceOrInsert(0, s); }
+
+    /**
+     * Construct a bitmap from a list of integer values.
+     */
+    static Roaring64Map bitmapOf(size_t n...) {
+        Roaring64Map ans;
+        va_list vl;
+        va_start(vl, n);
+        for (size_t i = 0; i < n; i++) {
+            ans.add(va_arg(vl, uint64_t));
+        }
+        va_end(vl);
+        return ans;
+    }
+
+    /**
+     * Add value x
+     *
+     */
+    void add(uint32_t x) {
+        roarings[0].add(x);
+        roarings[0].setCopyOnWrite(copyOnWrite);
+    }
+    void add(uint64_t x) {
+        roarings[highBytes(x)].add(lowBytes(x));
+        roarings[highBytes(x)].setCopyOnWrite(copyOnWrite);
+    }
+
+    /**
+     * Add value x
+     * Returns true if a new value was added, false if the value was already existing.
+     */
+    bool addChecked(uint32_t x) {
+        bool result = roarings[0].addChecked(x);
+        roarings[0].setCopyOnWrite(copyOnWrite);
+        return result;
+    }
+    bool addChecked(uint64_t x) {
+        bool result = roarings[highBytes(x)].addChecked(lowBytes(x));
+        roarings[highBytes(x)].setCopyOnWrite(copyOnWrite);
+        return result;
+    }
+
+    /**
+     * Add value n_args from pointer vals
+     *
+     */
+    void addMany(size_t n_args, const uint32_t* vals) {
+        for (size_t lcv = 0; lcv < n_args; lcv++) {
+            roarings[0].add(vals[lcv]);
+            roarings[0].setCopyOnWrite(copyOnWrite);
+        }
+    }
+    void addMany(size_t n_args, const uint64_t* vals) {
+        for (size_t lcv = 0; lcv < n_args; lcv++) {
+            roarings[highBytes(vals[lcv])].add(lowBytes(vals[lcv]));
+            roarings[highBytes(vals[lcv])].setCopyOnWrite(copyOnWrite);
+        }
+    }
+
+    /**
+     * Remove value x
+     *
+     */
+    void remove(uint32_t x) { roarings[0].remove(x); }
+    void remove(uint64_t x) {
+        auto roaring_iter = roarings.find(highBytes(x));
+        if (roaring_iter != roarings.cend()) roaring_iter->second.remove(lowBytes(x));
+    }
+
+    /**
+     * Remove value x
+     * Returns true if a new value was removed, false if the value was not existing.
+     */
+    bool removeChecked(uint32_t x) { return roarings[0].removeChecked(x); }
+    bool removeChecked(uint64_t x) {
+        auto roaring_iter = roarings.find(highBytes(x));
+        if (roaring_iter != roarings.cend()) return roaring_iter->second.removeChecked(lowBytes(x));
+        return false;
+    }
+
+    /**
+     * Return the largest value (if not empty)
+     *
+     */
+    uint64_t maximum() const {
+        for (auto roaring_iter = roarings.crbegin(); roaring_iter != roarings.crend();
+             ++roaring_iter) {
+            if (!roaring_iter->second.isEmpty()) {
+                return uniteBytes(roaring_iter->first, roaring_iter->second.maximum());
+            }
+        }
+        // we put std::numeric_limits<>::max/min in parenthesis
+        // to avoid a clash with the Windows.h header under Windows
+        return (std::numeric_limits<uint64_t>::min)();
+    }
+
+    /**
+     * Return the smallest value (if not empty)
+     *
+     */
+    uint64_t minimum() const {
+        for (auto roaring_iter = roarings.cbegin(); roaring_iter != roarings.cend();
+             ++roaring_iter) {
+            if (!roaring_iter->second.isEmpty()) {
+                return uniteBytes(roaring_iter->first, roaring_iter->second.minimum());
+            }
+        }
+        // we put std::numeric_limits<>::max/min in parenthesis
+        // to avoid a clash with the Windows.h header under Windows
+        return (std::numeric_limits<uint64_t>::max)();
+    }
+
+    /**
+     * Check if value x is present
+     */
+    bool contains(uint32_t x) const {
+        return roarings.count(0) == 0 ? false : roarings.at(0).contains(x);
+    }
+    bool contains(uint64_t x) const {
+        return roarings.count(highBytes(x)) == 0 ? false
+                                                 : roarings.at(highBytes(x)).contains(lowBytes(x));
+    }
+
+    /**
+     * Compute the intersection between the current bitmap and the provided
+     * bitmap,
+     * writing the result in the current bitmap. The provided bitmap is not
+     * modified.
+     */
+    Roaring64Map& operator&=(const Roaring64Map& r) {
+        for (auto& map_entry : roarings) {
+            if (r.roarings.count(map_entry.first) == 1)
+                map_entry.second &= r.roarings.at(map_entry.first);
+            else
+                map_entry.second = Roaring();
+        }
+        return *this;
+    }
+
+    /**
+     * Compute the difference between the current bitmap and the provided
+     * bitmap,
+     * writing the result in the current bitmap. The provided bitmap is not
+     * modified.
+     */
+    Roaring64Map& operator-=(const Roaring64Map& r) {
+        for (auto& map_entry : roarings) {
+            if (r.roarings.count(map_entry.first) == 1)
+                map_entry.second -= r.roarings.at(map_entry.first);
+        }
+        return *this;
+    }
+
+    /**
+     * Compute the union between the current bitmap and the provided bitmap,
+     * writing the result in the current bitmap. The provided bitmap is not
+     * modified.
+     *
+     * See also the fastunion function to aggregate many bitmaps more quickly.
+     */
+    Roaring64Map& operator|=(const Roaring64Map& r) {
+        for (const auto& map_entry : r.roarings) {
+            if (roarings.count(map_entry.first) == 0) {
+                roarings[map_entry.first] = map_entry.second;
+                roarings[map_entry.first].setCopyOnWrite(copyOnWrite);
+            } else
+                roarings[map_entry.first] |= map_entry.second;
+        }
+        return *this;
+    }
+
+    /**
+     * Compute the symmetric union between the current bitmap and the provided
+     * bitmap,
+     * writing the result in the current bitmap. The provided bitmap is not
+     * modified.
+     */
+    Roaring64Map& operator^=(const Roaring64Map& r) {
+        for (const auto& map_entry : r.roarings) {
+            if (roarings.count(map_entry.first) == 0) {
+                roarings[map_entry.first] = map_entry.second;
+                roarings[map_entry.first].setCopyOnWrite(copyOnWrite);
+            } else
+                roarings[map_entry.first] ^= map_entry.second;
+        }
+        return *this;
+    }
+
+    /**
+     * Exchange the content of this bitmap with another.
+     */
+    void swap(Roaring64Map& r) { roarings.swap(r.roarings); }
+
+    /**
+     * Get the cardinality of the bitmap (number of elements).
+     * Throws std::length_error in the special case where the bitmap is full
+     * (cardinality() == 2^64). Check isFull() before calling to avoid
+     * exception.
+     */
+    uint64_t cardinality() const {
+        if (isFull()) {
+            throw std::length_error(
+                    "bitmap is full, cardinality is 2^64, "
+                    "unable to represent in a 64-bit integer");
+        }
+        return std::accumulate(
+                roarings.cbegin(), roarings.cend(), (uint64_t)0,
+                [](uint64_t previous, const std::pair<uint32_t, Roaring>& map_entry) {
+                    return previous + map_entry.second.cardinality();
+                });
+    }
+
+    /**
+    * Returns true if the bitmap is empty (cardinality is zero).
+    */
+    bool isEmpty() const {
+        return std::all_of(roarings.cbegin(), roarings.cend(),
+                           [](const std::pair<uint32_t, Roaring>& map_entry) {
+                               return map_entry.second.isEmpty();
+                           });
+    }
+
+    /**
+    * Returns true if the bitmap is full (cardinality is max uint64_t + 1).
+    */
+    bool isFull() const {
+        // only bother to check if map is fully saturated
+        //
+        // we put std::numeric_limits<>::max/min in parenthesis
+        // to avoid a clash with the Windows.h header under Windows
+        return roarings.size() == ((size_t)(std::numeric_limits<uint32_t>::max)()) + 1
+                       ? std::all_of(roarings.cbegin(), roarings.cend(),
+                                     [](const std::pair<uint32_t, Roaring>& roaring_map_entry) {
+                                         // roarings within map are saturated if cardinality
+                                         // is uint32_t max + 1
+                                         return roaring_map_entry.second.cardinality() ==
+                                                ((uint64_t)(std::numeric_limits<uint32_t>::max)()) +
+                                                        1;
+                                     })
+                       : false;
+    }
+
+    /**
+    * Returns true if the bitmap is subset of the other.
+    */
+    bool isSubset(const Roaring64Map& r) const {
+        for (const auto& map_entry : roarings) {
+            auto roaring_iter = r.roarings.find(map_entry.first);
+            if (roaring_iter == roarings.cend())
+                return false;
+            else if (!map_entry.second.isSubset(roaring_iter->second))
+                return false;
+        }
+        return true;
+    }
+
+    /**
+    * Returns true if the bitmap is strict subset of the other.
+    * Throws std::length_error in the special case where the bitmap is full
+    * (cardinality() == 2^64). Check isFull() before calling to avoid exception.
+    */
+    bool isStrictSubset(const Roaring64Map& r) const {
+        return isSubset(r) && cardinality() != r.cardinality();
+    }
+
+    /**
+     * Convert the bitmap to an array. Write the output to "ans",
+     * caller is responsible to ensure that there is enough memory
+     * allocated
+     * (e.g., ans = new uint32[mybitmap.cardinality()];)
+     */
+    void toUint64Array(uint64_t* ans) const {
+        // Annoyingly, VS 2017 marks std::accumulate() as [[nodiscard]]
+        (void)std::accumulate(
+                roarings.cbegin(), roarings.cend(), ans,
+                [](uint64_t* previous, const std::pair<uint32_t, Roaring>& map_entry) {
+                    for (uint32_t low_bits : map_entry.second)
+                        *previous++ = uniteBytes(map_entry.first, low_bits);
+                    return previous;
+                });
+    }
+
+    /**
+     * Return true if the two bitmaps contain the same elements.
+     */
+    bool operator==(const Roaring64Map& r) const {
+        // we cannot use operator == on the map because either side may contain
+        // empty Roaring Bitmaps
+        auto lhs_iter = roarings.cbegin();
+        auto rhs_iter = r.roarings.cbegin();
+        do {
+            // if the left map has reached its end, ensure that the right map
+            // contains only empty Bitmaps
+            if (lhs_iter == roarings.cend()) {
+                while (rhs_iter != r.roarings.cend()) {
+                    if (rhs_iter->second.isEmpty()) {
+                        ++rhs_iter;
+                        continue;
+                    }
+                    return false;
+                }
+                return true;
+            }
+            // if the left map has an empty bitmap, skip it
+            if (lhs_iter->second.isEmpty()) {
+                ++lhs_iter;
+                continue;
+            }
+
+            do {
+                // if the right map has reached its end, ensure that the right
+                // map contains only empty Bitmaps
+                if (rhs_iter == r.roarings.cend()) {
+                    while (lhs_iter != roarings.cend()) {
+                        if (lhs_iter->second.isEmpty()) {
+                            ++lhs_iter;
+                            continue;
+                        }
+                        return false;
+                    }
+                    return true;
+                }
+                // if the right map has an empty bitmap, skip it
+                if (rhs_iter->second.isEmpty()) {
+                    ++rhs_iter;
+                    continue;
+                }
+            } while (false);
+            // if neither map has reached its end ensure elements are equal and
+            // move to the next element in both
+        } while (lhs_iter++->second == rhs_iter++->second);
+        return false;
+    }
+
+    /**
+     * compute the negation of the roaring bitmap within a specified interval.
+     * areas outside the range are passed through unchanged.
+     */
+    void flip(uint64_t range_start, uint64_t range_end) {
+        uint32_t start_high = highBytes(range_start);
+        uint32_t start_low = lowBytes(range_start);
+        uint32_t end_high = highBytes(range_end);
+        uint32_t end_low = lowBytes(range_end);
+
+        if (start_high == end_high) {
+            roarings[start_high].flip(start_low, end_low);
+            return;
+        }
+        // we put std::numeric_limits<>::max/min in parenthesis
+        // to avoid a clash with the Windows.h header under Windows
+        roarings[start_high].flip(start_low, (std::numeric_limits<uint32_t>::max)());
+        roarings[start_high++].setCopyOnWrite(copyOnWrite);
+
+        for (; start_high <= highBytes(range_end) - 1; ++start_high) {
+            roarings[start_high].flip((std::numeric_limits<uint32_t>::min)(),
+                                      (std::numeric_limits<uint32_t>::max)());
+            roarings[start_high].setCopyOnWrite(copyOnWrite);
+        }
+
+        roarings[start_high].flip((std::numeric_limits<uint32_t>::min)(), end_low);
+        roarings[start_high].setCopyOnWrite(copyOnWrite);
+    }
+
+    /**
+     *  Remove run-length encoding even when it is more space efficient
+     *  return whether a change was applied
+     */
+    bool removeRunCompression() {
+        return std::accumulate(roarings.begin(), roarings.end(), false,
+                               [](bool previous, std::pair<const uint32_t, Roaring>& map_entry) {
+                                   return map_entry.second.removeRunCompression() && previous;
+                               });
+    }
+
+    /** convert array and bitmap containers to run containers when it is more
+     * efficient;
+     * also convert from run containers when more space efficient.  Returns
+     * true if the result has at least one run container.
+     * Additional savings might be possible by calling shrinkToFit().
+     */
+    bool runOptimize() {
+        return std::accumulate(roarings.begin(), roarings.end(), false,
+                               [](bool previous, std::pair<const uint32_t, Roaring>& map_entry) {
+                                   return map_entry.second.runOptimize() && previous;
+                               });
+    }
+
+    /**
+     * If needed, reallocate memory to shrink the memory usage. Returns
+     * the number of bytes saved.
+    */
+    size_t shrinkToFit() {
+        size_t savedBytes = 0;
+        auto iter = roarings.begin();
+        while (iter != roarings.cend()) {
+            if (iter->second.isEmpty()) {
+                // empty Roarings are 84 bytes
+                savedBytes += 88;
+                roarings.erase(iter++);
+            } else {
+                savedBytes += iter->second.shrinkToFit();
+                iter++;
+            }
+        }
+        return savedBytes;
+    }
+
+    /**
+     * Iterate over the bitmap elements. The function iterator is called once
+     * for all the values with ptr (can be NULL) as the second parameter of each
+     * call.
+     *
+     * roaring_iterator is simply a pointer to a function that returns bool
+     * (true means that the iteration should continue while false means that it
+     * should stop), and takes (uint32_t,void*) as inputs.
+     */
+    void iterate(roaring_iterator64 iterator, void* ptr) const {
+        std::for_each(roarings.begin(), roarings.cend(),
+                      [=](const std::pair<uint32_t, Roaring>& map_entry) {
+                          roaring_iterate64(&map_entry.second.roaring, iterator,
+                                            uint64_t(map_entry.first) << 32, ptr);
+                      });
+    }
+
+    /**
+     * If the size of the roaring bitmap is strictly greater than rank, then
+     this
+       function returns true and set element to the element of given rank.
+       Otherwise, it returns false.
+     */
+    bool select(uint64_t rnk, uint64_t* element) const {
+        for (const auto& map_entry : roarings) {
+            uint64_t sub_cardinality = (uint64_t)map_entry.second.cardinality();
+            if (rnk < sub_cardinality) {
+                *element = ((uint64_t)map_entry.first) << 32;
+                // assuming little endian
+                return map_entry.second.select((uint32_t)rnk, ((uint32_t*)element));
+            }
+            rnk -= sub_cardinality;
+        }
+        return false;
+    }
+
+    /**
+    * Returns the number of integers that are smaller or equal to x.
+    */
+    uint64_t rank(uint64_t x) const {
+        uint64_t result = 0;
+        auto roaring_destination = roarings.find(highBytes(x));
+        if (roaring_destination != roarings.cend()) {
+            for (auto roaring_iter = roarings.cbegin(); roaring_iter != roaring_destination;
+                 ++roaring_iter) {
+                result += roaring_iter->second.cardinality();
+            }
+            result += roaring_destination->second.rank(lowBytes(x));
+            return result;
+        }
+        roaring_destination = roarings.lower_bound(highBytes(x));
+        for (auto roaring_iter = roarings.cbegin(); roaring_iter != roaring_destination;
+             ++roaring_iter) {
+            result += roaring_iter->second.cardinality();
+        }
+        return result;
+    }
+
+    /**
+     * write a bitmap to a char buffer.
+     * Returns how many bytes were written which should be getSizeInBytes().
+     */
+    size_t write(char* buf) const {
+        if (is32BitsEnough()) {
+            *(buf++) = BitmapTypeCode::type::BITMAP32;
+            auto it = roarings.find(0);
+            if (it == roarings.end()) { // empty bitmap
+                Roaring r;
+                return r.write(buf) + 1;
+            }
+            return it->second.write(buf) + 1;
+        }
+
+        const char* orig = buf;
+        // put type code
+        *(buf++) = BitmapTypeCode::type::BITMAP64;
+        // push map size
+        buf = (char*)encode_varint64((uint8_t*)buf, roarings.size());
+        std::for_each(roarings.cbegin(), roarings.cend(),
+                      [&buf](const std::pair<uint32_t, Roaring>& map_entry) {
+                          // push map key
+                          encode_fixed32_le((uint8_t*)buf, map_entry.first);
+                          buf += sizeof(uint32_t);
+                          // push map value Roaring
+                          buf += map_entry.second.write(buf);
+                      });
+        return buf - orig;
+    }
+
+    /**
+     * read a bitmap from a serialized version.
+     *
+     * This function is unsafe in the sense that if you provide bad data,
+     * many bytes could be read, possibly causing a buffer overflow. See also readSafe.
+     */
+    static Roaring64Map read(const char* buf) {
+        Roaring64Map result;
+
+        if (*buf == BitmapTypeCode::BITMAP32) {
+            Roaring read = Roaring::read(buf + 1);
+            result.emplaceOrInsert(0, read);
+            return result;
+        }
+
+        //DCHECK_EQ(BitmapTypeCode::BITMAP64, *buf);
+        buf++;
+
+        // get map size (varint64 took 1~10 bytes)
+        uint64_t map_size;
+        buf = reinterpret_cast<const char*>(
+                decode_varint64_ptr(reinterpret_cast<const uint8_t*>(buf),
+                                    reinterpret_cast<const uint8_t*>(buf + 10), &map_size));
+        //DCHECK(buf != nullptr);
+        for (uint64_t lcv = 0; lcv < map_size; lcv++) {
+            // get map key
+            uint32_t key = decode_fixed32_le(reinterpret_cast<const uint8_t*>(buf));
+            buf += sizeof(uint32_t);
+            // read map value Roaring
+            Roaring read = Roaring::read(buf);
+            result.emplaceOrInsert(key, read);
+            // forward buffer past the last Roaring Bitmap
+            buf += read.getSizeInBytes();
+        }
+        return result;
+    }
+
+    /**
+     * How many bytes are required to serialize this bitmap
+     */
+    size_t getSizeInBytes() const {
+        if (is32BitsEnough()) {
+            auto it = roarings.find(0);
+            if (it == roarings.end()) { // empty bitmap
+                Roaring r;
+                return r.getSizeInBytes() + 1;
+            }
+            return it->second.getSizeInBytes() + 1;
+        }
+        // start with type code, map size and size of keys for each map entry
+        size_t init = 1 + varint_length(roarings.size()) + roarings.size() * sizeof(uint32_t);
+        return std::accumulate(roarings.cbegin(), roarings.cend(), init,
+                               [=](size_t previous, const std::pair<uint32_t, Roaring>& map_entry) {
+                                   // add in bytes used by each Roaring
+                                   return previous + map_entry.second.getSizeInBytes();
+                               });
+    }
+
+    /**
+     * remove all elements
+     */
+    void clear() { roarings.clear(); }
+
+    /**
+     * Return whether all elements can be represented in 32 bits
+     */
+    bool is32BitsEnough() const { return maximum() <= std::numeric_limits<uint32_t>::max(); }
+
+    /**
+     * Computes the intersection between two bitmaps and returns new bitmap.
+     * The current bitmap and the provided bitmap are unchanged.
+     */
+    Roaring64Map operator&(const Roaring64Map& o) const { return Roaring64Map(*this) &= o; }
+
+    /**
+     * Computes the difference between two bitmaps and returns new bitmap.
+     * The current bitmap and the provided bitmap are unchanged.
+     */
+    Roaring64Map operator-(const Roaring64Map& o) const { return Roaring64Map(*this) -= o; }
+
+    /**
+     * Computes the union between two bitmaps and returns new bitmap.
+     * The current bitmap and the provided bitmap are unchanged.
+     */
+    Roaring64Map operator|(const Roaring64Map& o) const { return Roaring64Map(*this) |= o; }
+
+    /**
+     * Computes the symmetric union between two bitmaps and returns new bitmap.
+     * The current bitmap and the provided bitmap are unchanged.
+     */
+    Roaring64Map operator^(const Roaring64Map& o) const { return Roaring64Map(*this) ^= o; }
+
+    /**
+     * Whether or not we apply copy and write.
+     */
+    void setCopyOnWrite(bool val) {
+        if (copyOnWrite == val) return;
+        copyOnWrite = val;
+        std::for_each(roarings.begin(), roarings.end(),
+                      [=](std::pair<const uint32_t, Roaring>& map_entry) {
+                          map_entry.second.setCopyOnWrite(val);
+                      });
+    }
+
+    /**
+     * Print the content of the bitmap
+     */
+    void printf() const {
+        if (!isEmpty()) {
+            auto map_iter = roarings.cbegin();
+            while (map_iter->second.isEmpty()) ++map_iter;
+            struct iter_data {
+                uint32_t high_bits;
+                char first_char = '{';
+            } outer_iter_data;
+            outer_iter_data.high_bits = roarings.begin()->first;
+            map_iter->second.iterate(
+                    [](uint32_t low_bits, void* inner_iter_data) -> bool {
+                        std::printf("%c%llu", ((iter_data*)inner_iter_data)->first_char,
+                                    (long long unsigned)uniteBytes(
+                                            ((iter_data*)inner_iter_data)->high_bits, low_bits));
+                        ((iter_data*)inner_iter_data)->first_char = ',';
+                        return true;
+                    },
+                    (void*)&outer_iter_data);
+            std::for_each(
+                    ++map_iter, roarings.cend(), [](const std::pair<uint32_t, Roaring>& map_entry) {
+                        map_entry.second.iterate(
+                                [](uint32_t low_bits, void* high_bits) -> bool {
+                                    std::printf(",%llu", (long long unsigned)uniteBytes(
+                                                                 *(uint32_t*)high_bits, low_bits));
+                                    return true;
+                                },
+                                (void*)&map_entry.first);
+                    });
+        } else
+            std::printf("{");
+        std::printf("}\n");
+    }
+
+    /**
+     * Print the content of the bitmap into a string
+     */
+    std::string toString() const {
+        struct iter_data {
+            std::string str;
+            uint32_t high_bits;
+            char first_char = '{';
+        } outer_iter_data;
+        if (!isEmpty()) {
+            auto map_iter = roarings.cbegin();
+            while (map_iter->second.isEmpty()) ++map_iter;
+            outer_iter_data.high_bits = roarings.begin()->first;
+            map_iter->second.iterate(
+                    [](uint32_t low_bits, void* inner_iter_data) -> bool {
+                        ((iter_data*)inner_iter_data)->str +=
+                                ((iter_data*)inner_iter_data)->first_char;
+                        ((iter_data*)inner_iter_data)->str += std::to_string(
+                                uniteBytes(((iter_data*)inner_iter_data)->high_bits, low_bits));
+                        ((iter_data*)inner_iter_data)->first_char = ',';
+                        return true;
+                    },
+                    (void*)&outer_iter_data);
+            std::for_each(
+                    ++map_iter, roarings.cend(),
+                    [&outer_iter_data](const std::pair<uint32_t, Roaring>& map_entry) {
+                        outer_iter_data.high_bits = map_entry.first;
+                        map_entry.second.iterate(
+                                [](uint32_t low_bits, void* inner_iter_data) -> bool {
+                                    ((iter_data*)inner_iter_data)->str +=
+                                            ((iter_data*)inner_iter_data)->first_char;
+                                    ((iter_data*)inner_iter_data)->str += std::to_string(uniteBytes(
+                                            ((iter_data*)inner_iter_data)->high_bits, low_bits));
+                                    return true;
+                                },
+                                (void*)&outer_iter_data);
+                    });
+        } else
+            outer_iter_data.str = '{';
+        outer_iter_data.str += '}';
+        return outer_iter_data.str;
+    }
+
+    /**
+     * Whether or not copy and write is active.
+     */
+    bool getCopyOnWrite() const { return copyOnWrite; }
+
+    /**
+     * computes the logical or (union) between "n" bitmaps (referenced by a
+     * pointer).
+     */
+    static Roaring64Map fastunion(size_t n, const Roaring64Map** inputs) {
+        Roaring64Map ans;
+        // not particularly fast
+        for (size_t lcv = 0; lcv < n; ++lcv) {
+            ans |= *(inputs[lcv]);
+        }
+        return ans;
+    }
+
+    friend class Roaring64MapSetBitForwardIterator;
+    typedef Roaring64MapSetBitForwardIterator const_iterator;
+
+    /**
+    * Returns an iterator that can be used to access the position of the
+    * set bits. The running time complexity of a full scan is proportional to
+    * the
+    * number
+    * of set bits: be aware that if you have long strings of 1s, this can be
+    * very inefficient.
+    *
+    * It can be much faster to use the toArray method if you want to
+    * retrieve the set bits.
+    */
+    const_iterator begin() const;
+
+    /**
+    * A bogus iterator that can be used together with begin()
+    * for constructions such as for(auto i = b.begin();
+    * i!=b.end(); ++i) {}
+    */
+    const_iterator end() const;
+
+private:
+    std::map<uint32_t, Roaring> roarings;
+    bool copyOnWrite = false;
+    static uint32_t highBytes(const uint64_t in) { return uint32_t(in >> 32); }
+    static uint32_t lowBytes(const uint64_t in) { return uint32_t(in); }
+    static uint64_t uniteBytes(const uint32_t highBytes, const uint32_t lowBytes) {
+        return (uint64_t(highBytes) << 32) | uint64_t(lowBytes);
+    }
+    // this is needed to tolerate gcc's C++11 libstdc++ lacking emplace
+    // prior to version 4.8
+    void emplaceOrInsert(const uint32_t key, const Roaring& value) {
+#if defined(__GLIBCXX__) && __GLIBCXX__ < 20130322
+        roarings.insert(std::make_pair(key, value));
+#else
+        roarings.emplace(std::make_pair(key, value));
+#endif
+    }
+};
+
+// Forked from https://github.com/RoaringBitmap/CRoaring/blob/v0.2.60/cpp/roaring64map.hh
+// Used to go through the set bits. Not optimally fast, but convenient.
+class Roaring64MapSetBitForwardIterator final {
+public:
+    typedef std::forward_iterator_tag iterator_category;
+    typedef uint64_t* pointer;
+    typedef uint64_t& reference_type;
+    typedef uint64_t value_type;
+    typedef int64_t difference_type;
+    typedef Roaring64MapSetBitForwardIterator type_of_iterator;
+
+    /**
+     * Provides the location of the set bit.
+     */
+    value_type operator*() const {
+        return Roaring64Map::uniteBytes(map_iter->first, i.current_value);
+    }
+
+    bool operator<(const type_of_iterator& o) {
+        if (map_iter == map_end) return false;
+        if (o.map_iter == o.map_end) return true;
+        return **this < *o;
+    }
+
+    bool operator<=(const type_of_iterator& o) {
+        if (o.map_iter == o.map_end) return true;
+        if (map_iter == map_end) return false;
+        return **this <= *o;
+    }
+
+    bool operator>(const type_of_iterator& o) {
+        if (o.map_iter == o.map_end) return false;
+        if (map_iter == map_end) return true;
+        return **this > *o;
+    }
+
+    bool operator>=(const type_of_iterator& o) {
+        if (map_iter == map_end) return true;
+        if (o.map_iter == o.map_end) return false;
+        return **this >= *o;
+    }
+
+    type_of_iterator& operator++() { // ++i, must returned inc. value
+        if (i.has_value == true) roaring_advance_uint32_iterator(&i);
+        while (!i.has_value) {
+            map_iter++;
+            if (map_iter == map_end) return *this;
+            roaring_init_iterator(&map_iter->second.roaring, &i);
+        }
+        return *this;
+    }
+
+    type_of_iterator operator++(int) { // i++, must return orig. value
+        Roaring64MapSetBitForwardIterator orig(*this);
+        roaring_advance_uint32_iterator(&i);
+        while (!i.has_value) {
+            map_iter++;
+            if (map_iter == map_end) return orig;
+            roaring_init_iterator(&map_iter->second.roaring, &i);
+        }
+        return orig;
+    }
+
+    bool operator==(const Roaring64MapSetBitForwardIterator& o) {
+        if (map_iter == map_end && o.map_iter == o.map_end) return true;
+        if (o.map_iter == o.map_end) return false;
+        return **this == *o;
+    }
+
+    bool operator!=(const Roaring64MapSetBitForwardIterator& o) {
+        if (map_iter == map_end && o.map_iter == o.map_end) return false;
+        if (o.map_iter == o.map_end) return true;
+        return **this != *o;
+    }
+
+    Roaring64MapSetBitForwardIterator(const Roaring64Map& parent, bool exhausted = false)
+            : map_end(parent.roarings.cend()) {
+        if (exhausted || parent.roarings.empty()) {
+            map_iter = parent.roarings.cend();
+        } else {
+            map_iter = parent.roarings.cbegin();
+            roaring_init_iterator(&map_iter->second.roaring, &i);
+            while (!i.has_value) {
+                map_iter++;
+                if (map_iter == map_end) return;
+                roaring_init_iterator(&map_iter->second.roaring, &i);
+            }
+        }
+    }
+
+private:
+    std::map<uint32_t, Roaring>::const_iterator map_iter;
+    std::map<uint32_t, Roaring>::const_iterator map_end;
+    roaring_uint32_iterator_t i;
+};
+
+inline Roaring64MapSetBitForwardIterator Roaring64Map::begin() const {
+    return Roaring64MapSetBitForwardIterator(*this);
+}
+
+inline Roaring64MapSetBitForwardIterator Roaring64Map::end() const {
+    return Roaring64MapSetBitForwardIterator(*this, true);
+}
+
+} // namespace detail
+
+// Represent the in-memory and on-disk structure of Doris's BITMAP data type.
+// Optimize for the case where the bitmap contains 0 or 1 element which is common
+// for streaming load scenario.
+class BitmapValue {
+public:
+    // Construct an empty bitmap.
+    BitmapValue() : _type(EMPTY) {}
+
+    // Construct a bitmap with one element.
+    explicit BitmapValue(uint64_t value) : _sv(value), _type(SINGLE) {}
+
+    // Construct a bitmap from serialized data.
+    explicit BitmapValue(const char* src) {
+        bool res = deserialize(src);
+        //DCHECK(res);
+    }
+
+    // Construct a bitmap from given elements.
+    explicit BitmapValue(const std::vector<uint64_t>& bits) {
+        switch (bits.size()) {
+            case 0:
+                _type = EMPTY;
+                break;
+            case 1:
+                _type = SINGLE;
+                _sv = bits[0];
+                break;
+            default:
+                _type = BITMAP;
+                _bitmap.addMany(bits.size(), &bits[0]);
+        }
+    }
+
+    void add(uint64_t value) {
+        switch (_type) {
+            case EMPTY:
+                _sv = value;
+                _type = SINGLE;
+                break;
+            case SINGLE:
+                //there is no need to convert the type if two variables are equal
+                if (_sv == value) {
+                    break;
+                }
+                _bitmap.add(_sv);
+                _bitmap.add(value);
+                _type = BITMAP;
+                break;
+            case BITMAP:
+                _bitmap.add(value);
+        }
+    }
+
+    // Compute the union between the current bitmap and the provided bitmap.
+    // Possible type transitions are:
+    // EMPTY  -> SINGLE
+    // EMPTY  -> BITMAP
+    // SINGLE -> BITMAP
+    BitmapValue& operator|=(const BitmapValue& rhs) {
+        switch (rhs._type) {
+            case EMPTY:
+                break;
+            case SINGLE:
+                add(rhs._sv);
+                break;
+            case BITMAP:
+                switch (_type) {
+                    case EMPTY:
+                        _bitmap = rhs._bitmap;
+                        _type = BITMAP;
+                        break;
+                    case SINGLE:
+                        _bitmap = rhs._bitmap;
+                        _bitmap.add(_sv);
+                        _type = BITMAP;
+                        break;
+                    case BITMAP:
+                        _bitmap |= rhs._bitmap;
+                }
+                break;
+        }
+        return *this;
+    }
+
+    // Compute the intersection between the current bitmap and the provided bitmap.
+    // Possible type transitions are:
+    // SINGLE -> EMPTY
+    // BITMAP -> EMPTY
+    // BITMAP -> SINGLE
+    BitmapValue& operator&=(const BitmapValue& rhs) {
+        switch (rhs._type) {
+            case EMPTY:
+                _type = EMPTY;
+                _bitmap.clear();
+                break;
+            case SINGLE:
+                switch (_type) {
+                    case EMPTY:
+                        break;
+                    case SINGLE:
+                        if (_sv != rhs._sv) {
+                            _type = EMPTY;
+                        }
+                        break;
+                    case BITMAP:
+                        if (!_bitmap.contains(rhs._sv)) {
+                            _type = EMPTY;
+                        } else {
+                            _type = SINGLE;
+                            _sv = rhs._sv;
+                        }
+                        _bitmap.clear();
+                        break;
+                }
+                break;
+            case BITMAP:
+                switch (_type) {
+                    case EMPTY:
+                        break;
+                    case SINGLE:
+                        if (!rhs._bitmap.contains(_sv)) {
+                            _type = EMPTY;
+                        }
+                        break;
+                    case BITMAP:
+                        _bitmap &= rhs._bitmap;
+                        _convert_to_smaller_type();
+                        break;
+                }
+                break;
+        }
+        return *this;
+    }
+
+    // check if value x is present
+    bool contains(uint64_t x) {
+        switch (_type) {
+            case EMPTY:
+                return false;
+            case SINGLE:
+                return _sv == x;
+            case BITMAP:
+                return _bitmap.contains(x);
+        }
+        return false;
+    }
+
+    // TODO should the return type be uint64_t?
+    int64_t cardinality() const {
+        switch (_type) {
+            case EMPTY:
+                return 0;
+            case SINGLE:
+                return 1;
+            case BITMAP:
+                return _bitmap.cardinality();
+        }
+        return 0;
+    }
+
+    // Return how many bytes are required to serialize this bitmap.
+    // See BitmapTypeCode for the serialized format.
+    size_t getSizeInBytes() {
+        size_t res = 0;
+        switch (_type) {
+            case EMPTY:
+                res = 1;
+                break;
+            case SINGLE:
+                if (_sv <= std::numeric_limits<uint32_t>::max()) {
+                    res = 1 + sizeof(uint32_t);
+                } else {
+                    res = 1 + sizeof(uint64_t);
+                }
+                break;
+            case BITMAP:
+                //DCHECK(_bitmap.cardinality() > 1);
+                _bitmap.runOptimize();
+                _bitmap.shrinkToFit();
+                res = _bitmap.getSizeInBytes();
+                break;
+        }
+        return res;
+    }
+
+    // Serialize the bitmap value to dst, which should be large enough.
+    // Client should call `getSizeInBytes` first to get the serialized size.
+    void write(char* dst) {
+        switch (_type) {
+            case EMPTY:
+                *dst = BitmapTypeCode::EMPTY;
+                break;
+            case SINGLE:
+                if (_sv <= std::numeric_limits<uint32_t>::max()) {
+                    *(dst++) = BitmapTypeCode::SINGLE32;
+                    encode_fixed32_le(reinterpret_cast<uint8_t*>(dst), static_cast<uint32_t>(_sv));
+                } else {
+                    *(dst++) = BitmapTypeCode::SINGLE64;
+                    encode_fixed64_le(reinterpret_cast<uint8_t*>(dst), _sv);
+                }
+                break;
+            case BITMAP:
+                _bitmap.write(dst);
+                break;
+        }
+    }
+
+    // Deserialize a bitmap value from `src`.
+    // Return false if `src` begins with unknown type code, true otherwise.
+    bool deserialize(const char* src) {
+        //DCHECK(*src >= BitmapTypeCode::EMPTY && *src <= BitmapTypeCode::BITMAP64);
+        switch (*src) {
+            case BitmapTypeCode::EMPTY:
+                _type = EMPTY;
+                break;
+            case BitmapTypeCode::SINGLE32:
+                _type = SINGLE;
+                _sv = decode_fixed32_le(reinterpret_cast<const uint8_t*>(src + 1));
+                break;
+            case BitmapTypeCode::SINGLE64:
+                _type = SINGLE;
+                _sv = decode_fixed64_le(reinterpret_cast<const uint8_t*>(src + 1));
+                break;
+            case BitmapTypeCode::BITMAP32:
+            case BitmapTypeCode::BITMAP64:
+                _type = BITMAP;
+                _bitmap = detail::Roaring64Map::read(src);
+                break;
+            default:
+                return false;
+        }
+        return true;
+    }
+
+    // TODO limit string size to avoid OOM
+    std::string to_string() const {
+        std::stringstream ss;
+        switch (_type) {
+            case EMPTY:
+                break;
+            case SINGLE:
+                ss << _sv;
+                break;
+            case BITMAP: {
+                struct IterCtx {
+                    std::stringstream* ss = nullptr;
+                    bool first = true;
+                } iter_ctx;
+                iter_ctx.ss = &ss;
+
+                _bitmap.iterate(
+                        [](uint64_t value, void* c) -> bool {
+                            auto ctx = reinterpret_cast<IterCtx*>(c);
+                            if (ctx->first) {
+                                ctx->first = false;
+                            } else {
+                                (*ctx->ss) << ",";
+                            }
+                            (*ctx->ss) << value;
+                            return true;
+                        },
+                        &iter_ctx);
+                break;
+            }
+        }
+        return ss.str();
+    }
+
+private:
+    void _convert_to_smaller_type() {
+        if (_type == BITMAP) {
+            uint64_t c = _bitmap.cardinality();
+            if (c > 1) return;
+            if (c == 0) {
+                _type = EMPTY;
+            } else {
+                _type = SINGLE;
+                _sv = _bitmap.minimum();
+            }
+            _bitmap.clear();
+        }
+    }
+
+    enum BitmapDataType {
+        EMPTY = 0,
+        SINGLE = 1, // single element
+        BITMAP = 2 // more than one elements
+    };
+    uint64_t _sv = 0; // store the single value when _type == SINGLE
+    detail::Roaring64Map _bitmap; // used when _type == BITMAP
+    BitmapDataType _type;
+};
+
+} // namespace doris_udf
+
+#endif //DORIS_CONTRIB_UDF_SRC_UDAF_BITMAP_BITMAP_VALUE_H
diff --git a/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.cpp b/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.cpp
new file mode 100644
index 0000000..9cc0eae
--- /dev/null
+++ b/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.cpp
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "orthogonal_bitmap_function.h"
+#include "bitmap_value.h"
+#include "string_value.h"
+#include <iostream>
+
+namespace doris_udf {
+
+namespace detail {
+
+const int DATETIME_PACKED_TIME_BYTE_SIZE = 8;
+const int DATETIME_TYPE_BYTE_SIZE = 4;
+
+const int DECIMAL_BYTE_SIZE = 16;
+
+// get_val start
+template<typename ValType, typename T>
+T get_val(const ValType& x) {
+    return x.val;
+}
+
+template<>
+StringValue get_val(const StringVal& x) {
+    return StringValue::from_string_val(x);
+}
+// get_val end
+
+// serialize_size start
+template<typename T>
+int32_t serialize_size(const T& v) {
+    return sizeof(T);
+}
+
+template<>
+int32_t serialize_size(const StringValue& v) {
+    return v.len + 4;
+}
+// serialize_size end
+
+// write_to start
+template<typename T>
+char* write_to(const T& v, char* dest) {
+    size_t type_size = sizeof(T);
+    memcpy(dest, &v, type_size);
+    dest += type_size;
+    return dest;
+}
+
+template<>
+char* write_to(const StringValue& v, char* dest) {
+    *(int32_t*)dest = v.len;
+    dest += 4;
+    memcpy(dest, v.ptr, v.len);
+    dest += v.len;
+    return dest;
+}
+// write_to end
+
+// read_from start
+template<typename T>
+void read_from(const char** src, T* result) {
+    size_t type_size = sizeof(T);
+    memcpy(result, *src, type_size);
+    *src += type_size;
+}
+
+template<>
+void read_from(const char** src, StringValue* result) {
+    int32_t length = *(int32_t*)(*src);
+    *src += 4;
+    *result = StringValue((char *)*src, length);
+    *src += length;
+}
+// read_from end
+
+} // namespace detail
+
+static StringVal serialize(FunctionContext* ctx, BitmapValue* value) {
+    StringVal result(ctx, value->getSizeInBytes());
+    value->write((char*) result.ptr);
+    return result;
+}
+
+// Calculate the intersection of two or more bitmaps
+template<typename T>
+struct BitmapIntersect {
+public:
+    BitmapIntersect() {}
+
+    explicit BitmapIntersect(const char* src) {
+        deserialize(src);
+    }
+
+    void add_key(const T key) {
+        BitmapValue empty_bitmap;
+        _bitmaps[key] = empty_bitmap;
+    }
+
+    void update(const T& key, const BitmapValue& bitmap) {
+        if (_bitmaps.find(key) != _bitmaps.end()) {
+            _bitmaps[key] |= bitmap;
+        }
+    }
+
+    void merge(const BitmapIntersect& other) {
+        for (auto& kv: other._bitmaps) {
+            if (_bitmaps.find(kv.first) != _bitmaps.end()) {
+                _bitmaps[kv.first] |= kv.second;
+            } else {
+                _bitmaps[kv.first] = kv.second;
+            }
+        }
+    }
+
+    // calculate the intersection for _bitmaps's bitmap values
+    int64_t intersect_count() const {
+        if (_bitmaps.empty()) {
+            return 0;
+        }
+
+        BitmapValue result;
+        auto it = _bitmaps.begin();
+        result |= it->second;
+        it++;
+        for (;it != _bitmaps.end(); it++) {
+            result &= it->second;
+        }
+
+        return result.cardinality();
+    }
+
+    // intersection
+    BitmapValue intersect() {
+        BitmapValue result;
+        auto it = _bitmaps.begin();
+        result |= it->second;
+        it++;
+        for (;it != _bitmaps.end(); it++) {
+            result &= it->second;
+        }
+        return result;
+    }
+
+    // the serialize size
+    size_t size() {
+        size_t size = 4;
+        for (auto& kv: _bitmaps) {
+            size +=  detail::serialize_size(kv.first);;
+            size +=  kv.second.getSizeInBytes();
+        }
+        return size;
+    }
+
+    //must call size() first
+    void serialize(char* dest) {
+        char* writer = dest;
+        *(int32_t*)writer = _bitmaps.size();
+        writer += 4;
+        for (auto& kv: _bitmaps) {
+            writer = detail::write_to(kv.first, writer);
+            kv.second.write(writer);
+            writer += kv.second.getSizeInBytes();
+        }
+    }
+
+    void deserialize(const char* src) {
+        const char* reader = src;
+        int32_t bitmaps_size = *(int32_t*)reader;
+        reader += 4;
+        for (int32_t i = 0; i < bitmaps_size; i++) {
+            T key;
+            detail::read_from(&reader, &key);
+            BitmapValue bitmap(reader);
+            reader += bitmap.getSizeInBytes();
+            _bitmaps[key] = bitmap;
+        }
+    }
+
+private:
+    std::map<T, BitmapValue> _bitmaps;
+};
+
+void OrthogonalBitmapFunctions::init() {
+}
+
+void OrthogonalBitmapFunctions::bitmap_union_count_init(FunctionContext* ctx, StringVal* dst) {
+    dst->is_null = false;
+    dst->len = sizeof(BitmapValue);
+    dst->ptr = (uint8_t*)new BitmapValue();
+}
+
+void OrthogonalBitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+    if (src.is_null) {
+        return;
+    }
+    auto dst_bitmap = reinterpret_cast<BitmapValue*>(dst->ptr);
+    // zero size means the src input is a agg object
+    if (src.len == 0) {
+        (*dst_bitmap) |= *reinterpret_cast<BitmapValue*>(src.ptr);
+    } else {
+        (*dst_bitmap) |= BitmapValue((char*) src.ptr);
+    }
+}
+
+StringVal OrthogonalBitmapFunctions::bitmap_serialize(FunctionContext* ctx, const StringVal& src) {
+    if (src.is_null) {
+        return src;
+    }
+
+    auto src_bitmap = reinterpret_cast<BitmapValue*>(src.ptr);
+    StringVal result = serialize(ctx, src_bitmap);
+    delete src_bitmap;
+    return result;
+}
+
+StringVal OrthogonalBitmapFunctions::bitmap_count_serialize(FunctionContext* ctx, const StringVal& src) {
+    if (src.is_null) {
+        return src;
+    }
+
+    auto src_bitmap = reinterpret_cast<BitmapValue*>(src.ptr);
+    int64_t val = src_bitmap->cardinality();
+    StringVal result(ctx, sizeof(int64_t));
+
+    *(int64_t*)result.ptr = val;
+    delete src_bitmap;
+    return result;
+
+}
+
+// This is a init function for bitmap_intersect.
+template<typename T, typename ValType>
+void OrthogonalBitmapFunctions::bitmap_intersect_init(FunctionContext* ctx, StringVal* dst) {
+    // constant args start from index 2
+    if (ctx->get_num_constant_args() > 1) {
+        dst->is_null = false;
+        dst->len = sizeof(BitmapIntersect<T>);
+        auto intersect = new BitmapIntersect<T>();
+
+        for (int i = 2; i < ctx->get_num_constant_args(); ++i) {
+            ValType* arg = reinterpret_cast<ValType*>(ctx->get_constant_arg(i));
+            intersect->add_key(detail::get_val<ValType, T>(*arg));
+        }
+
+        dst->ptr = (uint8_t*)intersect;
+    } else {
+        dst->is_null = false;
+        dst->len = sizeof(BitmapValue);
+        dst->ptr = (uint8_t*)new BitmapValue();
+    }
+}
+
+// This is a init function for intersect_count.
+template<typename T, typename ValType>
+void OrthogonalBitmapFunctions::bitmap_intersect_count_init(FunctionContext* ctx, StringVal* dst) {
+    if (ctx->get_num_constant_args() > 1) {
+        dst->is_null = false;
+        dst->len = sizeof(BitmapIntersect<T>);
+        auto intersect = new BitmapIntersect<T>();
+
+        // constant args start from index 2
+        for (int i = 2; i < ctx->get_num_constant_args(); ++i) {
+            ValType* arg = reinterpret_cast<ValType*>(ctx->get_constant_arg(i));
+            intersect->add_key(detail::get_val<ValType, T>(*arg));
+        }
+
+        dst->ptr = (uint8_t*)intersect;
+    } else {
+        dst->is_null = false;
+        dst->len = sizeof(int64_t);
+        dst->ptr = (uint8_t*)new int64_t;
+        *(int64_t *)dst->ptr = 0;
+    }
+}
+
+template<typename T, typename ValType>
+void OrthogonalBitmapFunctions::bitmap_intersect_update(FunctionContext* ctx, const StringVal& src, const ValType& key,
+                                              int num_key, const ValType* keys, const StringVal* dst) {
+    auto* dst_bitmap = reinterpret_cast<BitmapIntersect<T>*>(dst->ptr);
+    // zero size means the src input is a agg object
+    if (src.len == 0) {
+        dst_bitmap->update(detail::get_val<ValType, T>(key), *reinterpret_cast<BitmapValue*>(src.ptr));
+    } else {
+        dst_bitmap->update(detail::get_val<ValType, T>(key), BitmapValue((char*)src.ptr));
+    }
+}
+
+template<typename T>
+void OrthogonalBitmapFunctions::bitmap_intersect_merge(FunctionContext* ctx, const StringVal& src, const StringVal* dst) {
+    auto* dst_bitmap = reinterpret_cast<BitmapIntersect<T>*>(dst->ptr);
+    dst_bitmap->merge(BitmapIntersect<T>((char*)src.ptr));
+}
+
+template<typename T>
+StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize(FunctionContext* ctx, const StringVal& src) {
+    auto* src_bitmap = reinterpret_cast<BitmapIntersect<T>*>(src.ptr);
+    StringVal result(ctx, src_bitmap->size());
+    src_bitmap->serialize((char*)result.ptr);
+    delete src_bitmap;
+    return result;
+}
+
+template<typename T>
+BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize(FunctionContext* ctx, const StringVal& src) {
+    auto* src_bitmap = reinterpret_cast<BitmapIntersect<T>*>(src.ptr);
+    BigIntVal result = BigIntVal(src_bitmap->intersect_count());
+    delete src_bitmap;
+    return result;
+}
+
+void OrthogonalBitmapFunctions::bitmap_count_merge(FunctionContext* context, const StringVal& src, StringVal* dst) {
+    if (dst->len != sizeof(int64_t)) {
+        auto dst_bitmap = reinterpret_cast<BitmapValue*>(dst->ptr);
+        delete dst_bitmap;
+        dst->is_null = false;
+        dst->len = sizeof(int64_t);
+        dst->ptr = (uint8_t*)new int64_t;
+        *(int64_t *)dst->ptr = 0;
+    }
+    *(int64_t *)dst->ptr += *(int64_t *)src.ptr;
+}
+
+BigIntVal OrthogonalBitmapFunctions::bitmap_count_finalize(FunctionContext* context, const StringVal& src) {
+    auto *pval = reinterpret_cast<int64_t *>(src.ptr);
+    int64_t result = *pval;
+    delete pval;
+    return result;
+}
+
+template<typename T>
+StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize(FunctionContext* ctx, const StringVal& src) {
+    auto* src_bitmap = reinterpret_cast<BitmapIntersect<T>*>(src.ptr);
+    int64_t val = src_bitmap->intersect_count();
+    StringVal result(ctx, sizeof(int64_t));
+    *(int64_t*)result.ptr = val;
+    delete src_bitmap;
+    return result;
+}
+
+template<typename T>
+StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize(FunctionContext* ctx, const StringVal& src) {
+    auto* src_bitmap = reinterpret_cast<BitmapIntersect<T>*>(src.ptr);
+    BitmapValue bitmap_val = src_bitmap->intersect();
+    StringVal result = serialize(ctx, &bitmap_val);
+    delete src_bitmap;
+    return result;
+}
+
+
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<int8_t, TinyIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<int16_t, SmallIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<int32_t, IntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<int64_t, BigIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<float, FloatVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<double, DoubleVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_count_init<StringValue, StringVal>(
+    FunctionContext* ctx, StringVal* dst);
+
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<int8_t, TinyIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<int16_t, SmallIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<int32_t, IntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<int64_t, BigIntVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<float, FloatVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<double, DoubleVal>(
+    FunctionContext* ctx, StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_init<StringValue, StringVal>(
+    FunctionContext* ctx, StringVal* dst);
+
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<int8_t, TinyIntVal>(
+    FunctionContext* ctx, const StringVal& src, const TinyIntVal& key,
+    int num_key, const TinyIntVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<int16_t, SmallIntVal>(
+    FunctionContext* ctx, const StringVal& src, const SmallIntVal& key,
+    int num_key, const SmallIntVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<int32_t, IntVal>(
+    FunctionContext* ctx, const StringVal& src, const IntVal& key,
+    int num_key, const IntVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<int64_t, BigIntVal>(
+    FunctionContext* ctx, const StringVal& src, const BigIntVal& key,
+    int num_key, const BigIntVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<float, FloatVal>(
+    FunctionContext* ctx, const StringVal& src, const FloatVal& key,
+    int num_key, const FloatVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<double, DoubleVal>(
+    FunctionContext* ctx, const StringVal& src, const DoubleVal& key,
+    int num_key, const DoubleVal* keys, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_update<StringValue, StringVal>(
+    FunctionContext* ctx, const StringVal& src, const StringVal& key,
+    int num_key, const StringVal* keys, const StringVal* dst);
+
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<int8_t>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<int16_t>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<int32_t>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<int64_t>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<float>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<double>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+template void OrthogonalBitmapFunctions::bitmap_intersect_merge<StringValue>(
+    FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<int8_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<int16_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<int32_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<int64_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<float>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<double>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_serialize<StringValue>(
+    FunctionContext* ctx, const StringVal& src);
+
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<int8_t>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<int16_t>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<int32_t>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<int64_t>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<float>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<double>(
+    FunctionContext* ctx, const StringVal& src);
+template BigIntVal OrthogonalBitmapFunctions::bitmap_intersect_finalize<StringValue>(
+    FunctionContext* ctx, const StringVal& src);
+
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<int8_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<int16_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<int32_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<int64_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<float>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<double>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_count_serialize<StringValue>(
+    FunctionContext* ctx, const StringVal& src);
+
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<int8_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<int16_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<int32_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<int64_t>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<float>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<double>(
+    FunctionContext* ctx, const StringVal& src);
+template StringVal OrthogonalBitmapFunctions::bitmap_intersect_and_serialize<StringValue>(
+    FunctionContext* ctx, const StringVal& src);
+}
diff --git a/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.h b/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.h
new file mode 100644
index 0000000..3de04cc
--- /dev/null
+++ b/contrib/udf/src/udaf_orthogonal_bitmap/orthogonal_bitmap_function.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "udf.h"
+
+namespace doris_udf {
+
+class OrthogonalBitmapFunctions {
+public:
+    static void init();
+    static void bitmap_union_count_init(FunctionContext* ctx, StringVal* slot);
+    static void bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst);
+    static StringVal bitmap_serialize(FunctionContext* ctx, const StringVal& src);
+    // bitmap_count_serialize
+    static StringVal bitmap_count_serialize(FunctionContext* ctx, const StringVal& src);
+    // count_merge 
+    static void bitmap_count_merge(FunctionContext* context, const StringVal& src, StringVal* dst);
+    // count_finalize
+    static BigIntVal bitmap_count_finalize(FunctionContext* context, const StringVal& src);
+
+
+     // intersect and intersect count
+    template<typename T, typename ValType>
+    static void bitmap_intersect_count_init(FunctionContext* ctx, StringVal* dst);
+    template<typename T, typename ValType>
+    static void bitmap_intersect_init(FunctionContext* ctx, StringVal* dst);
+
+    template<typename T, typename ValType>
+    static void bitmap_intersect_update(FunctionContext* ctx, const StringVal& src, const ValType& key,
+                                        int num_key, const ValType* keys, const StringVal* dst);
+    template<typename T>
+    static void bitmap_intersect_merge(FunctionContext* ctx, const StringVal& src, const StringVal* dst);
+    template<typename T>
+    static StringVal bitmap_intersect_serialize(FunctionContext* ctx, const StringVal& src);
+    template<typename T>
+    static BigIntVal bitmap_intersect_finalize(FunctionContext* ctx, const StringVal& src);
+
+    // bitmap_intersect_count_serialize
+    template<typename T>
+    static StringVal bitmap_intersect_count_serialize(FunctionContext* ctx, const StringVal& src);
+
+    // bitmap_intersect_and_serialize
+    template<typename T>
+    static StringVal bitmap_intersect_and_serialize(FunctionContext* ctx, const StringVal& src);
+
+};
+}
diff --git a/contrib/udf/src/udaf_orthogonal_bitmap/string_value.h b/contrib/udf/src/udaf_orthogonal_bitmap/string_value.h
new file mode 100644
index 0000000..860ccad
--- /dev/null
+++ b/contrib/udf/src/udaf_orthogonal_bitmap/string_value.h
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_CONTRIB_UDF_SRC_UDAF_BITMAP_STRING_VALUE_H
+#define DORIS_CONTRIB_UDF_SRC_UDAF_BITMAP_STRING_VALUE_H
+
+#include <string.h>
+#include "udf.h"
+
+namespace doris_udf {
+
+// The format of a string-typed slot.
+// The returned StringValue of all functions that return StringValue
+// shares its buffer the parent.
+struct StringValue {
+    static const int MAX_LENGTH = (1 << 30);
+    // TODO: change ptr to an offset relative to a contiguous memory block,
+    // so that we can send row batches between nodes without having to swizzle
+    // pointers
+    // NOTE: This struct should keep the same memory layout with Slice, otherwise
+    // it will lead to BE crash.
+    // TODO(zc): we should unify this struct with Slice some day.
+    char* ptr;
+    size_t len;
+
+    StringValue(char* ptr, int len): ptr(ptr), len(len) {}
+    StringValue(): ptr(NULL), len(0) {}
+
+    /// Construct a StringValue from 's'.  's' must be valid for as long as
+    /// this object is valid.
+    explicit StringValue(const std::string& s) : 
+            ptr(const_cast<char*>(s.c_str())), len(s.size()) {
+    }
+
+    void replace(char* ptr, int len) {
+        this->ptr = ptr;
+        this->len = len;
+    }
+    static int string_compare(const char* s1, int n1, const char* s2, int n2, int len) {
+        int result = strncmp(s1, s2, len);
+        if (result != 0) {
+            return result;
+        }
+        return n1 - n2;
+    }
+
+    // Byte-by-byte comparison. Returns:
+    // this < other: -1
+    // this == other: 0
+    // this > other: 1
+    int compare(const StringValue& other) const {
+        int l = std::min(len, other.len);
+        if (l == 0) {
+            if (len == other.len) {
+                return 0;
+            } else if (len == 0) {
+                return -1;
+            } else {
+                return 1;
+            }
+        }
+        return string_compare(this->ptr, this->len, other.ptr, other.len, l);
+    }
+
+    // ==
+    bool eq(const StringValue& other) const {
+        if (this->len != other.len) {
+            return false;
+        }
+        return string_compare(this->ptr, this->len, other.ptr, other.len, this->len) == 0;
+    }
+
+    bool operator==(const StringValue& other) const {
+        return eq(other);
+    }
+    // !=
+    bool ne(const StringValue& other) const {
+        return !eq(other);
+    }
+    // <=
+    bool le(const StringValue& other) const {
+        return compare(other) <= 0;
+    }
+    // >=
+    bool ge(const StringValue& other) const {
+        return compare(other) >= 0;
+    }
+    // <
+    bool lt(const StringValue& other) const {
+        return compare(other) < 0;
+    }
+    // >
+    bool gt(const StringValue& other) const {
+        return compare(other) > 0;
+    }
+
+    bool operator!=(const StringValue& other) const {
+        return ne(other);
+    }
+
+    bool operator<=(const StringValue& other) const {
+        return le(other);
+    }
+
+    bool operator>=(const StringValue& other) const {
+        return ge(other);
+    }
+
+    bool operator<(const StringValue& other) const {
+        return lt(other);
+    }
+
+    bool operator>(const StringValue& other) const {
+        return gt(other);
+    }
+
+    std::string debug_string() const {
+        return std::string(ptr, len);
+    }
+
+    std::string to_string() const {
+        return std::string(ptr, len);
+    }
+
+    // Returns the substring starting at start_pos until the end of string.
+    StringValue substring(int start_pos) const {
+        return StringValue(ptr + start_pos, len - start_pos);
+    }
+
+    // Returns the substring starting at start_pos with given length.
+    // If new_len < 0 then the substring from start_pos to end of string is returned.
+    StringValue substring(int start_pos, int new_len) const {
+        return StringValue(ptr + start_pos, (new_len < 0) ? (len - start_pos) : new_len);
+    }
+
+    // Trims leading and trailing spaces.
+    StringValue trim() const {
+        // Remove leading and trailing spaces.
+        int32_t begin = 0;
+        while (begin < len && ptr[begin] == ' ') {
+            ++begin;
+        }
+        int32_t end = len - 1;
+        while (end > begin && ptr[end] == ' ') {
+            --end;
+        }
+        return StringValue(ptr + begin, end - begin + 1);
+    }
+
+    void to_string_val(doris_udf::StringVal* sv) const {
+        *sv = doris_udf::StringVal(reinterpret_cast<uint8_t*>(ptr), len);
+    }
+
+    static StringValue from_string_val(const doris_udf::StringVal& sv) {
+        return StringValue(reinterpret_cast<char*>(sv.ptr), sv.len);
+    }
+};
+
+}
+
+#endif
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index a9421dd..295714c 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -150,7 +150,9 @@ module.exports = [
           {
             title: "Users contribute UDF",
             directoryPath: "contrib/",
-            children:[],       
+            children:[
+                "udaf-orthogonal-bitmap-manual",
+            ],
           },          
         ],
       },
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 09e54d3..3f0db4e 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -154,7 +154,9 @@ module.exports = [
           {
             title: "用户贡献的 UDF",
             directoryPath: "contrib/",
-            children:[],       
+            children:[
+                "udaf-orthogonal-bitmap-manual",
+            ],
           },          
         ],
       },
diff --git a/docs/en/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md b/docs/en/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md
new file mode 100644
index 0000000..1f6cdca
--- /dev/null
+++ b/docs/en/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md
@@ -0,0 +1,249 @@
+---
+{
+    "title": "Orthogonal BITMAP calculation UDAF",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Orthogonal BITMAP calculation UDAF
+
+## Background
+
+The original bitmap aggregate function designed by Doris is more general, but it has poor performance for the intersection and union of bitmap large cardinality above 100 million level. There are two main reasons for checking the bitmap aggregate function logic of the back-end be. First, when the bitmap cardinality is large, if the bitmap data size exceeds 1g, the network / disk IO processing time is relatively long; second, after the scan data, all the back-end be instances are transmit [...]
+
+The solution is to divide the bitmap column values according to the range, and the values of different ranges are stored in different buckets, so as to ensure that the bitmap values of different buckets are orthogonal and the data distribution is more uniform. In the case of query, the orthogonal bitmap in different buckets is firstly aggregated and calculated, and then the top-level node directly combines and summarizes the aggregated calculated values and outputs them. This will greatl [...]
+
+## User guide
+
+1. Create a table and add hid column to represent bitmap column value ID range as hash bucket column
+2. Data tank library: When loading data, divide the range of bitmap column values
+3. Compile UDAF and produce. So dynamic library
+4. Register the UDAF in DORIS, which loads the.so library at run time
+5. Usage scenarios
+
+### Create table
+
+We need to use the aggregation model when building tables. The data type is bitmap, and the aggregation function is bitmap_ union
+```
+CREATE TABLE `user_tag_bitmap` (
+  `tag` bigint(20) NULL COMMENT "user tag",
+  `hid` smallint(6) NULL COMMENT "Bucket ID",
+  `user_id` bitmap BITMAP_UNION NULL COMMENT ""
+) ENGINE=OLAP
+AGGREGATE KEY(`tag`, `hid`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`hid`) BUCKETS 3
+```
+The HID column is added to the table schema to indicate the ID range as a hash bucket column.
+
+Note: the HID number and buckets should be set reasonably, and the HID number should be set at least 5 times of buckets, so as to make the data hash bucket division as balanced as possible
+
+
+### Data Load
+
+``` 
+LOAD LABEL user_tag_bitmap_test
+(
+DATA INFILE('hdfs://abc')
+INTO TABLE user_tag_bitmap
+COLUMNS TERMINATED BY ','
+(tmp_tag, tmp_user_id)
+SET (
+tag = tmp_tag,
+hid = ceil(tmp_user_id/5000000),
+user_id = to_bitmap(tmp_user_id)
+)
+)
+...
+```
+
+Data format:
+
+``` 
+11111111,1
+11111112,2
+11111113,3
+11111114,4
+...
+```
+
+Note: the first column represents the user tags, which have been converted from Chinese into numbers
+
+When loading data, vertically cut the bitmap value range of the user. For example, the hid value of the user ID in the range of 1-5000000 is the same, and the row with the same HID value will be allocated into a sub-bucket, so that the bitmap value in each sub-bucket is orthogonal. On the UDAF implementation of bitmap, the orthogonal feature of bitmap value in the bucket can be used to perform intersection union calculation, and the calculation results will be shuffled to the top node fo [...]
+
+### Source code and compilation
+
+Source code:
+
+```
+contrib/udf/src/udaf_orthogonal_bitmap/
+|-- bitmap_value.h
+|-- CMakeLists.txt
+|-- orthogonal_bitmap_function.cpp
+|-- orthogonal_bitmap_function.h
+ -- string_value.h
+```
+
+Compile udaf:
+
+```
+$cd contrib/udf
+$ sh build_udf.sh
+
+```
+
+libudaf_orthogonal_bitmap.so output directory:
+
+```
+output/contrib/udf/lib/udaf_orthogonal_bitmap/libudaf_orthogonal_bitmap.so
+```
+
+
+### Register the UDAF with DORIS
+
+Setting parameters before Doris query
+
+```
+set parallel_fragment_exec_instance_num=5
+```
+
+Note: set concurrency parameters according to cluster conditions to improve concurrent computing performance
+
+The new UDAF aggregate function is created in mysql client link Session. It is created by registering the function symbol, which is loaded as a dynamic library. 
+
+#### orthogonal_bitmap_intersect 
+
+The bitmap intersection function
+
+Syntax:
+
+orthogonal_bitmap_intersect(bitmap_column, column_to_filter, filter_values)
+
+Parameters:
+
+the first parameter is the bitmap column, the second parameter is the dimension column for filtering, and the third parameter is the variable length parameter, which means different values of the filter dimension column
+
+Explain:
+
+on the basis of this table schema, this udaf has two levels of aggregation in query planning. In the first layer, be nodes (update and serialize) first press filter_ Values are used to hash aggregate the keys, and then the bitmaps of all keys are intersected. The results are serialized and sent to the second level be nodes (merge and finalize). In the second level be nodes, all the bitmap values from the first level nodes are combined circularly
+
+Create UDAF:
+
+```
+drop FUNCTION orthogonal_bitmap_intersect(BITMAP,BIGINT,BIGINT, ...);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_intersect(BITMAP,BIGINT,BIGINT, ...) RETURNS BITMAP INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_intersect_initIlNS_9BigIntValEEEvPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_intersect_updateIlNS_9BigIntValEEEvPNS_15FunctionContextERKNS_9StringValERKT0_iPS9_PS6_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions30bitmap_intersect_and_serializeIlEENS_9StringValEPNS_15FunctionContextERKS2_",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions12bitmap_unionEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions16bitmap_serializeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+
+```
+
+Note:
+
+1. column_to_filter, filter_values column is set to bigint type here;
+
+2. the function symbol passes through nm /xxx/xxx/libudaf_orthogonal_bitmap.so|grep "bitmap_" 
+
+Example:
+
+```
+select BITMAP_COUNT(orthogonal_bitmap_intersect(user_id, tag, 13080800, 11110200)) from user_tag_bitmap  where tag in (13080800, 11110200);
+
+```
+
+#### orthogonal_bitmap_intersect_count 
+
+To calculate the bitmap intersection count function, the syntax is the same as the original Intersect_Count, but the implementation is different
+
+Syntax:
+
+orthogonal_bitmap_intersect_count(bitmap_column, column_to_filter, filter_values)
+
+Parameters:
+
+The first parameter is the bitmap column, the second parameter is the dimension column for filtering, and the third parameter is the variable length parameter, which means different values of the filter dimension column
+
+Explain:
+
+on the basis of this table schema, the query planning aggregation is divided into two layers. In the first layer, be nodes (update and serialize) first press filter_ Values are used to hash aggregate the keys, and then the intersection of bitmaps of all keys is performed, and then the intersection results are counted. The count values are serialized and sent to the second level be nodes (merge and finalize). In the second level be nodes, the sum of all the count values from the first lev [...]
+
+Create UDAF:
+
+```
+drop FUNCTION orthogonal_bitmap_intersect_count(BITMAP,BIGINT,BIGINT, ...);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_intersect_count(BITMAP,BIGINT,BIGINT, ...) RETURNS BIGINT INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions27bitmap_intersect_count_initIlNS_9BigIntValEEEvPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_intersect_updateIlNS_9BigIntValEEEvPNS_15FunctionContextERKNS_9StringValERKT0_iPS9_PS6_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions32bitmap_intersect_count_serializeIlEENS_9StringValEPNS_15FunctionContextERKS2_",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions18bitmap_count_mergeEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_count_finalizeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+```
+
+#### orthogonal_bitmap_union_count 
+
+Figure out the bitmap union count function, syntax with the original bitmap_union_count, but the implementation is different.
+
+Syntax:
+
+orthogonal_bitmap_union_count(bitmap_column)
+
+Explain:
+
+on the basis of this table schema, this udaf is divided into two layers. In the first layer, be nodes (update and serialize) merge all the bitmaps, and then count the resulting bitmaps. The count values are serialized and sent to the second level be nodes (merge and finalize). In the second layer, the be nodes are used to calculate the sum of all the count values from the first level nodes
+
+Create UDAF:
+
+```
+drop FUNCTION orthogonal_bitmap_union_count(BITMAP);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_union_count(BITMAP) RETURNS BIGINT INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_union_count_initEPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions12bitmap_unionEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions22bitmap_count_serializeEPNS_15FunctionContextERKNS_9StringValE",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions18bitmap_count_mergeEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_count_finalizeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+```
+
+### Suitable for the scene
+
+It is consistent with the scenario of orthogonal calculation of bitmap, such as calculation retention, funnel, user portrait, etc.
+
+Crowd selection:
+
+```
+select orthogonal_bitmap_intersect_count(user_id, tag, 13080800, 11110200) from user_tag_bitmap where tag in (13080800, 11110200);
+
+Note: 13080800 and 11110200 represent user labels
+```
+
+Calculate the deduplication value for user_id:
+
+```
+select orthogonal_bitmap_union_count(user_id) from user_tag_bitmap where tag in (13080800, 11110200);
+```
diff --git a/docs/zh-CN/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md b/docs/zh-CN/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md
new file mode 100644
index 0000000..68a0a21
--- /dev/null
+++ b/docs/zh-CN/extending-doris/udf/contrib/udaf-orthogonal-bitmap-manual.md
@@ -0,0 +1,238 @@
+---
+{
+    "title": "正交的BITMAP计算UDAF",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 正交的BITMAP计算UDAF
+
+## 背景
+
+Doris原有的Bitmap聚合函数设计比较通用,但对亿级别以上bitmap大基数的交并集计算性能较差。排查后端be的bitmap聚合函数逻辑,发现主要有两个原因。一是当bitmap基数较大时,如bitmap大小超过1g,网络/磁盘IO处理时间比较长;二是后端be实例在scan数据后全部传输到顶层节点进行求交和并运算,给顶层单节点带来压力,成为处理瓶颈。
+
+解决思路是将bitmap列的值按照range划分,不同range的值存储在不同的分桶中,保证了不同分桶的bitmap值是正交的。当查询时,先分别对不同分桶中的正交bitmap进行聚合计算,然后顶层节点直接将聚合计算后的值合并汇总,并输出。如此会大大提高计算效率,解决了顶层单节点计算瓶颈问题。
+
+## 使用指南
+
+1. 建表,增加hid列,表示bitmap列值id范围, 作为hash分桶列
+2. 数据罐库,load数据时,对bitmap列值range范围划分
+3. 编译UDAF,产出.so动态库
+4. 在DORIS中注册UDAF,DORIS在运行时加载.so库
+5. 使用场景
+
+### Create table
+
+建表时需要使用聚合模型,数据类型是 bitmap , 聚合函数是 bitmap_union
+
+```
+CREATE TABLE `user_tag_bitmap` (
+  `tag` bigint(20) NULL COMMENT "用户标签",
+  `hid` smallint(6) NULL COMMENT "分桶id",
+  `user_id` bitmap BITMAP_UNION NULL COMMENT ""
+) ENGINE=OLAP
+AGGREGATE KEY(`tag`, `hid`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`hid`) BUCKETS 3
+```
+表schema增加hid列,表示id范围, 作为hash分桶列。
+
+注:hid数和BUCKETS要设置合理,hid数设置至少是BUCKETS的5倍以上,以使数据hash分桶尽量均衡
+
+### Data Load
+
+``` 
+LOAD LABEL user_tag_bitmap_test
+(
+DATA INFILE('hdfs://abc')
+INTO TABLE user_tag_bitmap
+COLUMNS TERMINATED BY ','
+(tmp_tag, tmp_user_id)
+SET (
+tag = tmp_tag,
+hid = ceil(tmp_user_id/5000000),
+user_id = to_bitmap(tmp_user_id)
+)
+)
+注意:5000000这个数不固定,可按需调整
+...
+```
+数据格式:
+``` 
+11111111,1
+11111112,2
+11111113,3
+11111114,4
+...
+```
+注:第一列代表用户标签,由中文转换成数字
+
+load数据时,对用户bitmap值range范围纵向切割,例如,用户id在1-5000000范围内的hid值相同,hid值相同的行会分配到一个分桶内,如此每个分桶内到的bitmap都是正交的。在bitmap的udaf实现上,可以利用桶内bitmap值正交特性,进行交并集计算,计算结果会被shuffle至top节点聚合。
+
+
+### 编译UDAF
+源代码:
+```
+contrib/udf/src/udaf_orthogonal_bitmap/
+|-- bitmap_value.h
+|-- CMakeLists.txt
+|-- orthogonal_bitmap_function.cpp
+|-- orthogonal_bitmap_function.h
+`-- string_value.h
+```
+编译UDAF:
+```
+$cd contrib/udf
+$ sh build_udf.sh
+
+```
+libudaf_orthogonal_bitmap.so产出目录:
+```
+output/contrib/udf/lib/udaf_orthogonal_bitmap/libudaf_orthogonal_bitmap.so
+```
+
+### 在DORIS中注册UDAF
+Doris查询前设置参数
+```
+set parallel_fragment_exec_instance_num=5
+```
+注:根据集群情况设置并发参数,提高并发计算性能
+
+新udaf聚合函数在mysql客户端链接session中创建,创建时需要注册函数符号,函数符号会以动态库.so的方式加载。
+
+#### bitmap_orthogonal_intersect 
+
+求bitmap交集函数
+
+语法:
+
+   orthogonal_bitmap_intersect(bitmap_column, column_to_filter, filter_values)
+  
+参数:
+
+   第一个参数是Bitmap列,第二个参数是用来过滤的维度列,第三个参数是变长参数,含义是过滤维度列的不同取值
+   
+说明:
+
+   查询规划上聚合分2层,在第一层be节点(update、serialize)先按filter_values为key进行hash聚合,然后对所有key的bitmap求交集,结果序列化后发送至第二层be节点(merge、finalize),在第二层be节点对所有来源于第一层节点的bitmap值循环求并集
+   
+创建UDAF:
+```
+drop FUNCTION orthogonal_bitmap_intersect(BITMAP,BIGINT,BIGINT, ...);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_intersect(BITMAP,BIGINT,BIGINT, ...) RETURNS BITMAP INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_intersect_initIlNS_9BigIntValEEEvPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_intersect_updateIlNS_9BigIntValEEEvPNS_15FunctionContextERKNS_9StringValERKT0_iPS9_PS6_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions30bitmap_intersect_and_serializeIlEENS_9StringValEPNS_15FunctionContextERKS2_",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions12bitmap_unionEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions16bitmap_serializeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+
+```
+注意:
+
+1.column_to_filter, filter_values列这里设置为BIGINT类型;
+
+2.函数符号通过nm /xxx/xxx/libudaf_orthogonal_bitmap.so|grep "xxx" 查找
+
+样例:
+```
+select BITMAP_COUNT(orthogonal_bitmap_intersect(user_id, tag, 13080800, 11110200)) from user_tag_bitmap  where tag in (13080800, 11110200);
+
+```
+
+#### orthogonal_bitmap_intersect_count  
+
+求bitmap交集count函数,语法同原版intersect_count,但实现不同
+
+语法:
+
+  orthogonal_bitmap_intersect_count(bitmap_column, column_to_filter, filter_values)
+
+参数:
+
+  第一个参数是Bitmap列,第二个参数是用来过滤的维度列,第三个参数开始是变长参数,含义是过滤维度列的不同取值
+  
+说明:
+
+  查询规划聚合上分2层,在第一层be节点(update、serialize)先按filter_values为key进行hash聚合,然后对所有key的bitmap求交集,再对交集结果求count,count值序列化后发送至第二层be节点(merge、finalize),在第二层be节点对所有来源于第一层节点的count值循环求sum
+  
+创建UDAF:
+```
+drop FUNCTION orthogonal_bitmap_intersect_count(BITMAP,BIGINT,BIGINT, ...);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_intersect_count(BITMAP,BIGINT,BIGINT, ...) RETURNS BIGINT INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions27bitmap_intersect_count_initIlNS_9BigIntValEEEvPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_intersect_updateIlNS_9BigIntValEEEvPNS_15FunctionContextERKNS_9StringValERKT0_iPS9_PS6_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions32bitmap_intersect_count_serializeIlEENS_9StringValEPNS_15FunctionContextERKS2_",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions18bitmap_count_mergeEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_count_finalizeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+```
+
+#### orthogonal_bitmap_union_count 
+
+求bitmap并集count函数,语法同原版bitmap_union_count,但实现不同。
+
+语法:
+
+  orthogonal_bitmap_union_count(bitmap_column)
+ 
+参数:
+
+  参数类型是bitmap,是待求并集count的列
+  
+说明:
+
+  查询规划上分2层,在第一层be节点(update、serialize)对所有bitmap求并集,再对并集的结果bitmap求count,count值序列化后发送至第二层be节点(merge、finalize),在第二层be节点对所有来源于第一层节点的count值循环求sum
+
+创建UDAF:
+```
+drop FUNCTION orthogonal_bitmap_union_count(BITMAP);
+CREATE AGGREGATE FUNCTION orthogonal_bitmap_union_count(BITMAP) RETURNS BIGINT INTERMEDIATE varchar(1)
+PROPERTIES (
+"init_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions23bitmap_union_count_initEPNS_15FunctionContextEPNS_9StringValE",
+"update_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions12bitmap_unionEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"serialize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions22bitmap_count_serializeEPNS_15FunctionContextERKNS_9StringValE",
+"merge_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions18bitmap_count_mergeEPNS_15FunctionContextERKNS_9StringValEPS3_",
+"finalize_fn"="_ZN9doris_udf25OrthogonalBitmapFunctions21bitmap_count_finalizeEPNS_15FunctionContextERKNS_9StringValE",
+"object_file"="http://ip:port/libudaf_orthogonal_bitmap.so" );
+```
+
+### 使用场景
+
+符合对bitmap进行正交计算的场景,如在用户行为分析中,计算留存,漏斗,用户画像等。
+
+
+人群圈选:
+
+```
+ select orthogonal_bitmap_intersect_count(user_id, tag, 13080800, 11110200) from user_tag_bitmap where tag in (13080800, 11110200);
+ 注:13080800、11110200代表用户标签
+```
+
+计算user_id的去重值:
+
+```
+select orthogonal_bitmap_union_count(user_id) from user_tag_bitmap where tag in (13080800, 11110200);
+
+```


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org