You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/10 03:54:09 UTC

[GitHub] [doris] eldenmoon commented on a diff in pull request #15491: [Enhancement](point query optimize) improve performace of point query on primary keys

eldenmoon commented on code in PR #15491:
URL: https://github.com/apache/doris/pull/15491#discussion_r1065298618


##########
be/src/util/jsonb_writer.h:
##########
@@ -234,6 +234,17 @@ class JsonbWriterT {
         return 0;
     }
 
+    uint32_t writeInt128(__int128_t v) {
+        if (!stack_.empty() && verifyValueState()) {

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java:
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.Status;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.KeyTuple;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprList;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.protobuf.ByteString;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class PointQueryExec {
+    private static final Logger LOG = LogManager.getLogger(PointQueryExec.class);
+    private TNetworkAddress address;
+    // SlotRef sorted by column id
+    private Map<SlotRef, Expr> equalPredicats;
+    // ByteString serialized for prepared statement
+    private ByteString serializedDescTable;
+    private ByteString serializedOutputExpr;
+    private ArrayList<Expr> outputExprs;
+    private DescriptorTable descriptorTable;
+    private long tabletID = 0;
+    private long timeoutMs = 1000; // default 1s
+
+    private boolean isCancel = false;
+    private boolean isBinaryProtocol = false;
+    private long backendID;
+    // For parepared statement cached structure,
+    // there are some pre caculated structure in Backend TabletFetch service
+    // using this ID to find for this prepared statement
+    private UUID cacheID;
+
+    public PointQueryExec(Map<SlotRef, Expr> equalPredicats, DescriptorTable descTable,
+                    ArrayList<Expr> outputExprs) {
+        this.equalPredicats = equalPredicats;
+        this.descriptorTable = descTable;
+        this.outputExprs = outputExprs;
+    }
+
+    void setAddressAndBackendID(TNetworkAddress addr, long backendID) {
+        this.address = addr;
+        this.backendID = backendID;
+    }
+
+    public void setSerializedDescTable(ByteString serializedDescTable) {
+        this.serializedDescTable = serializedDescTable;
+    }
+
+    public void setSerializedOutputExpr(ByteString serializedOutputExpr) {
+        this.serializedOutputExpr = serializedOutputExpr;
+    }
+
+    public void setCacheID(UUID cacheID) {
+        this.cacheID = cacheID;
+    }
+
+    public void setTabletId(long tabletID) {
+        this.tabletID = tabletID;
+    }
+
+    public void setTimeout(long timeoutMs) {
+        this.timeoutMs = timeoutMs;
+    }
+
+    public void setBinaryProtocol(boolean isBinaryProtocol) {
+        this.isBinaryProtocol = isBinaryProtocol;
+    }
+
+    void addKeyTuples(
+                InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
+        // TODO handle IN predicates
+        KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
+        for (Expr expr : equalPredicats.values()) {
+            LiteralExpr lexpr = (LiteralExpr) expr;
+            kBuilder.addKeyColumnRep(lexpr.getStringValue());
+        }
+        requestBuilder.addKeyTuples(kBuilder);
+    }
+
+    public RowBatch getNext(Status status) throws TException {
+        long timeoutTs = System.currentTimeMillis() + timeoutMs;
+        RowBatch rowBatch = new RowBatch();
+        InternalService.PTabletKeyLookupResponse pResult = null;
+        try {
+            if (serializedDescTable == null) {
+                serializedDescTable = ByteString.copyFrom(
+                        new TSerializer().serialize(descriptorTable.toThrift()));
+            }
+            if (serializedOutputExpr == null) {
+                List<TExpr> exprs = new ArrayList<>();
+                for (Expr expr : outputExprs) {
+                    exprs.add(expr.treeToThrift());
+                }
+                TExprList exprList = new TExprList(exprs);
+                serializedOutputExpr = ByteString.copyFrom(
+                        new TSerializer().serialize(exprList));
+            }
+
+            InternalService.PTabletKeyLookupRequest.Builder requestBuilder
+                        = InternalService.PTabletKeyLookupRequest.newBuilder()
+                            .setTabletId(tabletID)
+                            .setDescTbl(serializedDescTable)
+                            .setOutputExpr(serializedOutputExpr)
+                            .setIsBinaryRow(isBinaryProtocol);
+            if (cacheID != null) {
+                InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder();
+                uuidBuilder.setUuidHigh(cacheID.getMostSignificantBits());
+                uuidBuilder.setUuidLow(cacheID.getLeastSignificantBits());
+                requestBuilder.setUuid(uuidBuilder);
+            }
+            addKeyTuples(requestBuilder);
+            InternalService.PTabletKeyLookupRequest request = requestBuilder.build();
+            Future<InternalService.PTabletKeyLookupResponse> futureResponse =
+                    BackendServiceProxy.getInstance().fetchTabletDataAsync(address, request);
+            while (pResult == null) {
+                long currentTs = System.currentTimeMillis();
+                if (currentTs >= timeoutTs) {
+                    LOG.warn("fetch result timeout {}", address);
+                    status.setStatus("query timeout");
+                    return null;

Review Comment:
   need retry logic



##########
be/src/util/mysql_global.h:
##########
@@ -26,13 +26,16 @@ typedef unsigned char uchar;
 
 #define int1store(T, A) *((uint8_t*)(T)) = (uint8_t)(A)
 #define int2store(T, A) *((uint16_t*)(T)) = (uint16_t)(A)
+#define int4store(T, A) *((uint32_t*)(T)) = (uint32_t)(A)
 #define int3store(T, A)                           \
     do {                                          \
         *(T) = (uchar)((A));                      \
         *(T + 1) = (uchar)(((uint32_t)(A) >> 8)); \
         *(T + 2) = (uchar)(((A) >> 16));          \
     } while (0)
 #define int8store(T, A) *((int64_t*)(T)) = (uint64_t)(A)
+#define float4store(T, A) memcpy(T, (&A), sizeof(float))

Review Comment:
   done



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:

Review Comment:
   done



##########
be/src/service/internal_service.cpp:
##########
@@ -391,6 +393,25 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     st.to_protobuf(result->mutable_status());
 }
 
+Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
+                                                PTabletKeyLookupResponse* response) {
+    TabletLookupMetric lookup_util;
+    RETURN_IF_ERROR(lookup_util.init(request, response));
+    RETURN_IF_ERROR(lookup_util.lookup_up());
+    LOG(INFO) << lookup_util.print_profile();

Review Comment:
   INFO is suitable, but it maybe too frequent, so I changed it to LOG_EVERY_N



##########
be/src/service/tablet_lookup_metric.h:
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "butil/containers/doubly_buffered_data.h"
+#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gutil/int128.h"
+#include "olap/tablet.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+// For caching point lookup pre allocted blocks and exprs
+class Reusable {
+public:
+    ~Reusable();
+
+    bool is_expired(int64_t ttl_ms) const {
+        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
+    }
+
+    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
+                size_t block_size = 1);
+
+    RuntimeState* runtime_state() { return _runtime_state.get(); }
+
+    std::unique_ptr<vectorized::Block> get_block();
+
+    // do not touch block after returned
+    void return_block(std::unique_ptr<vectorized::Block>& block);
+
+    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
+
+    const std::vector<vectorized::VExprContext*>& output_exprs() { return _output_exprs_ctxs; }
+
+private:
+    // caching TupleDescriptor, output_expr, etc...
+    std::unique_ptr<RuntimeState> _runtime_state;
+    DescriptorTbl* _desc_tbl;
+    std::mutex _block_mutex;
+    // prevent from allocte too many tmp blocks
+    std::vector<std::unique_ptr<vectorized::Block>> _block_pool;
+    std::vector<vectorized::VExprContext*> _output_exprs_ctxs;
+    int64_t _create_timestamp = 0;
+};
+
+// A cache used for prepare stmt.
+// One connection per stmt perf uuid
+// Use DoublyBufferedData to wrap Cache for performance and thread safe,
+// since it's not barely modified
+class LookupCache {
+public:
+    // uuid to reusable
+    using Cache = phmap::flat_hash_map<uint128, std::shared_ptr<Reusable>>;
+    using CacheIter = Cache::iterator;
+
+    LookupCache() = default;
+    static LookupCache& instance() {
+        static LookupCache ins;
+        return ins;
+    }
+
+    void add(uint128 cache_id, std::shared_ptr<Reusable> item) {
+        assert(item != nullptr);
+        _double_buffer_cache.Modify(update_cache, std::make_pair(cache_id, item));
+    }
+
+    // find an item, return null if not exist
+    std::shared_ptr<Reusable> get(uint128 cache_id) {
+        butil::DoublyBufferedData<Cache>::ScopedPtr s;
+        if (_double_buffer_cache.Read(&s) != 0) {
+            LOG(WARNING) << "failed to get cache from double buffer data";
+            return nullptr;
+        }
+        auto it = s->find(cache_id);
+        if (it != s->end()) {
+            return it->second;
+        }
+        return nullptr;
+    }
+
+private:
+    butil::DoublyBufferedData<Cache> _double_buffer_cache;
+    // 30 seconds for expiring an item
+    int32_t _expir_seconds = config::tablet_lookup_cache_clean_interval;
+
+    static size_t update_cache(Cache& old_cache,
+                               const std::pair<uint128, std::shared_ptr<Reusable>>& p) {
+        old_cache.emplace(p);
+        return 1;
+    }
+
+    static size_t remove_items(Cache& old_cache, const std::vector<uint128>& keys) {
+        for (size_t i = 0; i < keys.size(); ++i) {
+            old_cache.erase(keys[i]);
+        }
+        return 1;
+    }
+
+    // Called from StorageEngine::_start_clean_lookup_cache
+    friend class StorageEngine;
+    void prune() {
+        std::vector<uint128> expired_keys;
+        {
+            butil::DoublyBufferedData<Cache>::ScopedPtr s;
+            if (_double_buffer_cache.Read(&s) != 0) {
+                return;
+            }
+            for (auto it = s->begin(); it != s->end(); ++it) {
+                if (it->second->is_expired(_expir_seconds * 1000)) {
+                    expired_keys.push_back(it->first);
+                }
+            }
+        }
+
+        _double_buffer_cache.Modify(remove_items, expired_keys);
+        LOG(INFO) << "prune lookup cache, total " << expired_keys.size() << " expired items";
+    }
+};
+
+struct Metrics {
+    Metrics()
+            : init_ns(TUnit::TIME_NS),
+              init_key_ns(TUnit::TIME_NS),
+              lookup_key_ns(TUnit::TIME_NS),
+              lookup_data_ns(TUnit::TIME_NS),
+              output_data_ns(TUnit::TIME_NS) {}
+    RuntimeProfile::Counter init_ns;
+    RuntimeProfile::Counter init_key_ns;
+    RuntimeProfile::Counter lookup_key_ns;
+    RuntimeProfile::Counter lookup_data_ns;
+    RuntimeProfile::Counter output_data_ns;
+};
+
+// An util to do tablet lookup
+class TabletLookupMetric {

Review Comment:
   changed to `PointQueryExecutor`



##########
be/src/olap/tablet_schema.cpp:
##########
@@ -920,11 +923,25 @@ bool operator==(const TabletSchema& a, const TabletSchema& b) {
     if (a._is_in_memory != b._is_in_memory) return false;
     if (a._delete_sign_idx != b._delete_sign_idx) return false;
     if (a._disable_auto_compaction != b._disable_auto_compaction) return false;
+    if (a._store_row_column != b._store_row_column) return false;
     return true;
 }
 
 bool operator!=(const TabletSchema& a, const TabletSchema& b) {
     return !(a == b);
 }
 
+const TabletColumn& TabletSchema::row_oriented_column() {
+    static TabletColumn source_column(OLAP_FIELD_AGGREGATION_NONE,

Review Comment:
   already return const



##########
be/src/util/mysql_row_buffer.cpp:
##########
@@ -290,7 +392,11 @@ int MysqlRowBuffer::push_double(double data) {
     return 0;
 }
 
-int MysqlRowBuffer::push_time(double data) {
+template <bool is_binary_format>
+int MysqlRowBuffer<is_binary_format>::push_time(double data) {
+    if constexpr (is_binary_format) {
+        LOG(FATAL) << "not implemented";

Review Comment:
   added



##########
be/src/util/jsonb_stream.h:
##########
@@ -126,6 +126,11 @@ class JsonbOutStream : public std::ostream {
         size_ += len;
     }
 
+    void write(__int128 l) {
+        // TODO

Review Comment:
   this function is not used for now



##########
be/src/service/tablet_lookup_metric.h:
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "butil/containers/doubly_buffered_data.h"
+#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gutil/int128.h"
+#include "olap/tablet.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+// For caching point lookup pre allocted blocks and exprs
+class Reusable {
+public:
+    ~Reusable();
+
+    bool is_expired(int64_t ttl_ms) const {
+        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
+    }
+
+    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
+                size_t block_size = 1);
+
+    RuntimeState* runtime_state() { return _runtime_state.get(); }
+
+    std::unique_ptr<vectorized::Block> get_block();
+
+    // do not touch block after returned
+    void return_block(std::unique_ptr<vectorized::Block>& block);
+
+    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
+
+    const std::vector<vectorized::VExprContext*>& output_exprs() { return _output_exprs_ctxs; }
+
+private:
+    // caching TupleDescriptor, output_expr, etc...
+    std::unique_ptr<RuntimeState> _runtime_state;

Review Comment:
   it's safe since Reusable is wrapped in std::shared_ptr



##########
be/src/vec/CMakeLists.txt:
##########
@@ -55,6 +55,7 @@ set(VEC_FILES
   columns/column_string.cpp
   columns/column_vector.cpp
   columns/columns_common.cpp
+  jsonb/serialize.cpp

Review Comment:
   done



##########
be/src/util/mysql_global.h:
##########
@@ -26,13 +26,16 @@ typedef unsigned char uchar;
 
 #define int1store(T, A) *((uint8_t*)(T)) = (uint8_t)(A)
 #define int2store(T, A) *((uint16_t*)(T)) = (uint16_t)(A)
+#define int4store(T, A) *((uint32_t*)(T)) = (uint32_t)(A)

Review Comment:
   done



##########
be/src/service/tablet_lookup_metric.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/tablet_lookup_metric.h"
+
+#include "olap/row_cursor.h"
+#include "olap/storage_engine.h"
+#include "service/internal_service.h"
+#include "util/defer_op.h"
+#include "util/key_util.h"
+#include "util/runtime_profile.h"
+#include "util/thrift_util.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/sink/vmysql_result_writer.cpp"
+
+namespace doris {
+
+Reusable::~Reusable() {
+    for (vectorized::VExprContext* ctx : _output_exprs_ctxs) {
+        ctx->close(_runtime_state.get());
+    }
+}
+
+Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
+                      size_t block_size) {
+    _runtime_state.reset(new RuntimeState());
+    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl));
+    _runtime_state->set_desc_tbl(_desc_tbl);
+    _block_pool.resize(block_size);
+    for (int i = 0; i < _block_pool.size(); ++i) {
+        _block_pool[i] = std::make_unique<vectorized::Block>(tuple_desc()->slots(), 10);
+    }
+
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_runtime_state->obj_pool(), output_exprs,
+                                                         &_output_exprs_ctxs));
+    RowDescriptor row_desc(tuple_desc(), false);
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
+    _create_timestamp = butil::gettimeofday_ms();
+    return Status::OK();
+}
+
+std::unique_ptr<vectorized::Block> Reusable::get_block() {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.empty()) {
+        return std::make_unique<vectorized::Block>(tuple_desc()->slots(), 4);

Review Comment:
   may lead to too many config, so I use magic number here



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:
+        case TYPE_DOUBLE: {
+            assert(slot_value->isDouble());
+            dst->insert(static_cast<JsonbDoubleVal*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATE: {
+            assert(slot_value->isInt32());
+            int32_t val = static_cast<JsonbInt32Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 1);
+            break;
+        }
+        case TYPE_DATETIME: {
+            assert(slot_value->isInt64());
+            int64_t val = static_cast<JsonbInt64Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 1);
+            break;
+        }
+        case TYPE_DATEV2: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATETIMEV2: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL32: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL64: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL128I: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        default:
+            LOG(FATAL) << "unknow type " << type;
+            break;
+        }
+    }
+}
+
+// column value -> jsonb
+static void serialize_column(Arena* mem_pool, const TabletColumn& tablet_column,
+                             const IColumn* column, const StringRef& data_ref, int row,
+                             JsonbWriterT<JsonbOutStream>& jsonb_writer) {
+    jsonb_writer.writeKey(tablet_column.unique_id());
+    if (is_column_null_at(row, column, tablet_column.type(), data_ref)) {
+        jsonb_writer.writeNull();
+        return;
+    }
+    if (tablet_column.is_array_type()) {
+        const char* begin = nullptr;
+        StringRef value = column->serialize_value_into_arena(row, *mem_pool, begin);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(value.data, value.size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_OBJECT) {
+        auto bitmap_value = (BitmapValue*)(data_ref.data);
+        auto size = bitmap_value->getSizeInBytes();
+        // serialize the content of string
+        auto ptr = mem_pool->alloc(size);
+        bitmap_value->write(reinterpret_cast<char*>(ptr));
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_HLL) {
+        auto hll_value = (HyperLogLog*)(data_ref.data);
+        auto size = hll_value->max_serialized_size();
+        auto ptr = reinterpret_cast<char*>(mem_pool->alloc(size));
+        size_t actual_size = hll_value->serialize((uint8_t*)ptr);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), actual_size);
+        jsonb_writer.writeEndBinary();
+    } else if (is_jsonb_blob_type(tablet_column.type())) {
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(data_ref.data), data_ref.size);
+        jsonb_writer.writeEndBinary();
+    } else {
+        switch (tablet_column.type()) {
+        case OLAP_FIELD_TYPE_BOOL: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_TINYINT: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_SMALLINT: {
+            int16_t val = *reinterpret_cast<const int16_t*>(data_ref.data);
+            jsonb_writer.writeInt16(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_INT: {
+            int32_t val = *reinterpret_cast<const int32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_BIGINT: {
+            int64_t val = *reinterpret_cast<const int64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_LARGEINT: {
+            __int128_t val = *reinterpret_cast<const __int128_t*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_FLOAT: {
+            float val = *reinterpret_cast<const float*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DOUBLE: {
+            double val = *reinterpret_cast<const double*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATE: {
+            const auto* datetime_cur = reinterpret_cast<const VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt32(datetime_cur->to_olap_date());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIME: {
+            const auto* datetime_cur = reinterpret_cast<const VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt64(datetime_cur->to_olap_datetime());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATEV2: {
+            uint32_t val = *reinterpret_cast<const uint32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIMEV2: {
+            uint64_t val = *reinterpret_cast<const uint64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL32: {
+            Decimal32::NativeType val =
+                    *reinterpret_cast<const Decimal32::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL64: {
+            Decimal64::NativeType val =
+                    *reinterpret_cast<const Decimal64::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL128I: {
+            Decimal128I::NativeType val =
+                    *reinterpret_cast<const Decimal128I::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL:
+            LOG(FATAL) << "OLAP_FIELD_TYPE_DECIMAL not implemented use DecimalV3 instead";
+            break;
+        default:
+            LOG(FATAL) << "unknow type " << tablet_column.type();
+            break;
+        }
+    }
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& block,
+                                        ColumnString& dst, int num_cols) {
+    auto num_rows = block.rows();
+    Arena pool;
+    assert(num_cols <= block.columns());
+    for (int i = 0; i < num_rows; ++i) {
+        JsonbWriterT<JsonbOutStream> jsonb_writer;

Review Comment:
   resue JsonbWriterT should call `reset` before reusing it, but `reset` is slower than it's destructor



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO

Review Comment:
   done



##########
be/src/olap/tablet.cpp:
##########
@@ -1844,6 +1846,103 @@ TabletSchemaSPtr Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
     return _max_version_schema;
 }
 
+RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
+    for (auto& version_rowset : _rs_version_map) {
+        if (version_rowset.second->rowset_id() == rowset_id) {
+            return version_rowset.second;
+        }
+    }
+    for (auto& stale_version_rowset : _stale_rs_version_map) {
+        if (stale_version_rowset.second->rowset_id() == rowset_id) {
+            return stale_version_rowset.second;
+        }
+    }
+    return nullptr;
+}
+
+Status Tablet::lookup_row_data(const RowLocation& row_location, const TupleDescriptor* desc,
+                               vectorized::Block* block) {
+    // read row data
+    BetaRowsetSharedPtr rowset =
+            std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
+    if (!rowset) {
+        return Status::NotFound(
+                fmt::format("rowset {} not found", row_location.rowset_id.to_string()));
+    }
+
+    const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+    SegmentCacheHandle segment_cache;
+    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
+    // find segment
+    auto it = std::find_if(segment_cache.get_segments().begin(), segment_cache.get_segments().end(),
+                           [&row_location](const segment_v2::SegmentSharedPtr& seg) {
+                               return seg->id() == row_location.segment_id;
+                           });
+    if (it == segment_cache.get_segments().end()) {
+        return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}",
+                                            row_location.rowset_id.to_string(),
+                                            row_location.segment_id));
+    }
+    // read from segment column by column, row by row
+    segment_v2::SegmentSharedPtr segment = *it;
+    size_t row_size = 0;
+    MonotonicStopWatch watch;
+    watch.start();
+    Defer _defer([&]() {
+        LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << watch.elapsed_time() / 1000
+                               << ", row_size:" << row_size;
+    });
+    // TODO(lhy) too long, refacor
+    if (tablet_schema->store_row_column()) {
+        // create _source column
+        segment_v2::ColumnIterator* column_iterator = nullptr;
+        RETURN_IF_ERROR(segment->new_row_column_iterator(&column_iterator));
+        std::unique_ptr<segment_v2::ColumnIterator> ptr_guard(column_iterator);
+        segment_v2::ColumnIteratorOptions opt;
+        OlapReaderStatistics stats;
+        opt.file_reader = segment->file_reader().get();
+        opt.stats = &stats;
+        opt.use_page_cache = !config::disable_storage_page_cache;
+        column_iterator->init(opt);
+        // get and parse tuple row
+        vectorized::MutableColumnPtr column_ptr =
+                vectorized::DataTypeFactory::instance()
+                        .create_data_type(TabletSchema::row_oriented_column())
+                        ->create_column();
+        std::vector<segment_v2::rowid_t> rowids {
+                static_cast<segment_v2::rowid_t>(row_location.row_id)};
+        RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column_ptr));
+        assert(column_ptr->size() == 1);
+        auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get());
+        vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, *block);
+        return Status::OK();
+    }
+    // read from segment column by column, row by row
+    for (int x = 0; x < desc->slots().size(); ++x) {
+        int index = tablet_schema->field_index(desc->slots()[x]->col_unique_id());
+        auto column = block->get_by_position(x).column->assume_mutable();
+        // TODO handle real default value

Review Comment:
   deleted this logic,  only use row store logic now



##########
be/src/runtime/descriptors.h:
##########
@@ -302,6 +307,13 @@ class JdbcTableDescriptor : public TableDescriptor {
 class TupleDescriptor {
 public:
     // virtual ~TupleDescriptor() {}
+    ~TupleDescriptor() {

Review Comment:
   added



##########
be/src/olap/delta_writer.cpp:
##########
@@ -440,6 +440,7 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
                                                     ptable_schema_param.indexes(i),
                                                     ori_tablet_schema);
     }
+

Review Comment:
   deleted



##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -244,6 +244,7 @@ Status SegmentIterator::_get_row_ranges_by_keys() {
         auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
         RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
     }
+

Review Comment:
   deleted



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:
+        case TYPE_DOUBLE: {
+            assert(slot_value->isDouble());
+            dst->insert(static_cast<JsonbDoubleVal*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATE: {
+            assert(slot_value->isInt32());
+            int32_t val = static_cast<JsonbInt32Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 1);
+            break;
+        }
+        case TYPE_DATETIME: {
+            assert(slot_value->isInt64());
+            int64_t val = static_cast<JsonbInt64Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 1);
+            break;
+        }
+        case TYPE_DATEV2: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATETIMEV2: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL32: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL64: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL128I: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        default:
+            LOG(FATAL) << "unknow type " << type;
+            break;
+        }
+    }
+}
+
+// column value -> jsonb
+static void serialize_column(Arena* mem_pool, const TabletColumn& tablet_column,
+                             const IColumn* column, const StringRef& data_ref, int row,
+                             JsonbWriterT<JsonbOutStream>& jsonb_writer) {
+    jsonb_writer.writeKey(tablet_column.unique_id());
+    if (is_column_null_at(row, column, tablet_column.type(), data_ref)) {
+        jsonb_writer.writeNull();
+        return;
+    }
+    if (tablet_column.is_array_type()) {
+        const char* begin = nullptr;
+        StringRef value = column->serialize_value_into_arena(row, *mem_pool, begin);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(value.data, value.size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_OBJECT) {
+        auto bitmap_value = (BitmapValue*)(data_ref.data);
+        auto size = bitmap_value->getSizeInBytes();
+        // serialize the content of string
+        auto ptr = mem_pool->alloc(size);
+        bitmap_value->write(reinterpret_cast<char*>(ptr));
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_HLL) {
+        auto hll_value = (HyperLogLog*)(data_ref.data);
+        auto size = hll_value->max_serialized_size();
+        auto ptr = reinterpret_cast<char*>(mem_pool->alloc(size));
+        size_t actual_size = hll_value->serialize((uint8_t*)ptr);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), actual_size);
+        jsonb_writer.writeEndBinary();
+    } else if (is_jsonb_blob_type(tablet_column.type())) {
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(data_ref.data), data_ref.size);
+        jsonb_writer.writeEndBinary();
+    } else {
+        switch (tablet_column.type()) {
+        case OLAP_FIELD_TYPE_BOOL: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_TINYINT: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_SMALLINT: {
+            int16_t val = *reinterpret_cast<const int16_t*>(data_ref.data);
+            jsonb_writer.writeInt16(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_INT: {
+            int32_t val = *reinterpret_cast<const int32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_BIGINT: {
+            int64_t val = *reinterpret_cast<const int64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_LARGEINT: {
+            __int128_t val = *reinterpret_cast<const __int128_t*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_FLOAT: {
+            float val = *reinterpret_cast<const float*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DOUBLE: {
+            double val = *reinterpret_cast<const double*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATE: {
+            const auto* datetime_cur = reinterpret_cast<const VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt32(datetime_cur->to_olap_date());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIME: {
+            const auto* datetime_cur = reinterpret_cast<const VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt64(datetime_cur->to_olap_datetime());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATEV2: {
+            uint32_t val = *reinterpret_cast<const uint32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIMEV2: {
+            uint64_t val = *reinterpret_cast<const uint64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL32: {
+            Decimal32::NativeType val =
+                    *reinterpret_cast<const Decimal32::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL64: {
+            Decimal64::NativeType val =
+                    *reinterpret_cast<const Decimal64::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL128I: {
+            Decimal128I::NativeType val =
+                    *reinterpret_cast<const Decimal128I::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL:
+            LOG(FATAL) << "OLAP_FIELD_TYPE_DECIMAL not implemented use DecimalV3 instead";
+            break;
+        default:
+            LOG(FATAL) << "unknow type " << tablet_column.type();
+            break;
+        }
+    }
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& block,
+                                        ColumnString& dst, int num_cols) {
+    auto num_rows = block.rows();
+    Arena pool;
+    assert(num_cols <= block.columns());
+    for (int i = 0; i < num_rows; ++i) {
+        JsonbWriterT<JsonbOutStream> jsonb_writer;
+        jsonb_writer.writeStartObject();
+        for (int j = 0; j < num_cols; ++j) {
+            const auto& column = block.get_by_position(j).column;
+            const auto& tablet_column = schema.columns()[j];
+            const auto& data_ref =
+                    !tablet_column.is_array_type() ? column->get_data_at(i) : StringRef();

Review Comment:
   since ColumnArray is not continous memory  for function `get_data_at `



##########
be/src/service/tablet_lookup_metric.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/tablet_lookup_metric.h"
+
+#include "olap/row_cursor.h"
+#include "olap/storage_engine.h"
+#include "service/internal_service.h"
+#include "util/defer_op.h"
+#include "util/key_util.h"
+#include "util/runtime_profile.h"
+#include "util/thrift_util.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/sink/vmysql_result_writer.cpp"
+
+namespace doris {
+
+Reusable::~Reusable() {
+    for (vectorized::VExprContext* ctx : _output_exprs_ctxs) {
+        ctx->close(_runtime_state.get());
+    }
+}
+
+Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
+                      size_t block_size) {
+    _runtime_state.reset(new RuntimeState());
+    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl));
+    _runtime_state->set_desc_tbl(_desc_tbl);
+    _block_pool.resize(block_size);
+    for (int i = 0; i < _block_pool.size(); ++i) {
+        _block_pool[i] = std::make_unique<vectorized::Block>(tuple_desc()->slots(), 10);
+    }
+
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_runtime_state->obj_pool(), output_exprs,
+                                                         &_output_exprs_ctxs));
+    RowDescriptor row_desc(tuple_desc(), false);
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
+    _create_timestamp = butil::gettimeofday_ms();
+    return Status::OK();
+}
+
+std::unique_ptr<vectorized::Block> Reusable::get_block() {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.empty()) {
+        return std::make_unique<vectorized::Block>(tuple_desc()->slots(), 4);
+    }
+    auto block = std::move(_block_pool.back());
+    _block_pool.pop_back();
+    return block;
+}
+
+void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.size() > 128) {
+        return;
+    }
+    block->clear_column_data();
+    _block_pool.push_back(std::move(block));
+}
+
+Status TabletLookupMetric::init(const PTabletKeyLookupRequest* request,
+                                PTabletKeyLookupResponse* response) {
+    SCOPED_TIMER(&_profile_metrics.init_ns);
+    _response = response;
+    // using cache
+    uint128 uuid {static_cast<uint64_t>(request->uuid().uuid_high()),
+                  static_cast<uint64_t>(request->uuid().uuid_low())};
+    auto cache_handle = LookupCache::instance().get(uuid);
+    _binary_row_format = request->is_binary_row();
+    if (cache_handle != nullptr) {
+        _reusable = cache_handle;
+        _hit_lookup_cache = true;
+    } else {
+        // init handle
+        auto reusable_ptr = std::make_shared<Reusable>();
+        TDescriptorTable t_desc_tbl;
+        TExprList t_output_exprs;
+        uint32_t len = request->desc_tbl().size();
+        RETURN_IF_ERROR(
+                deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()),
+                                       &len, false, &t_desc_tbl));
+        len = request->output_expr().size();
+        RETURN_IF_ERROR(deserialize_thrift_msg(
+                reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false,
+                &t_output_exprs));
+        _reusable = reusable_ptr;
+        if (uuid != 0) {
+            LookupCache::instance().add(uuid, reusable_ptr);
+            // could be reused by requests after, pre allocte more blocks
+            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, 128));
+        } else {
+            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, 1));
+        }
+    }
+    _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id());
+    if (_tablet == nullptr) {
+        LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << request->tablet_id()
+                     << "] is not exist";
+        return Status::NotFound(fmt::format("tablet {} not exist", request->tablet_id()));
+    }
+    RETURN_IF_ERROR(_init_keys(request));
+    _result_block = _reusable->get_block();
+    DCHECK(_result_block != nullptr);
+    return Status::OK();
+}
+
+Status TabletLookupMetric::lookup_up() {
+    RETURN_IF_ERROR(_lookup_row_key());
+    RETURN_IF_ERROR(_lookup_row_data());
+    RETURN_IF_ERROR(_output_data());
+    return Status::OK();
+}
+
+std::string TabletLookupMetric::print_profile() {
+    auto init_us = _profile_metrics.init_ns.value() / 1000;
+    auto init_key_us = _profile_metrics.init_key_ns.value() / 1000;
+    auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000;
+    auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000;
+    auto output_data_us = _profile_metrics.output_data_ns.value() / 1000;
+    auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us;
+    return fmt::format(
+            ""
+            "[lookup profile:{}us] init:{}us, init_key:{}us,"
+            ""
+            ""
+            "lookup_key:{}us, lookup_data:{}us, output_data:{}us, hit_lookup_cache:{}"
+            ""
+            ""
+            ", is_binary_row:{}, output_columns:{}"
+            "",
+            total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, output_data_us,
+            _hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size());
+}
+
+Status TabletLookupMetric::_init_keys(const PTabletKeyLookupRequest* request) {
+    SCOPED_TIMER(&_profile_metrics.init_key_ns);
+    // 1. get primary key from conditions
+    std::vector<OlapTuple> olap_tuples;
+    olap_tuples.resize(request->key_tuples().size());
+    for (size_t i = 0; i < request->key_tuples().size(); ++i) {
+        const KeyTuple& key_tuple = request->key_tuples(i);
+        for (const std::string& key_col : key_tuple.key_column_rep()) {
+            olap_tuples[i].add_value(key_col);
+        }
+    }
+    _primary_keys.resize(olap_tuples.size());
+    // get row cursor and encode keys
+    for (size_t i = 0; i < olap_tuples.size(); ++i) {
+        RowCursor cursor;
+        RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values()));
+        RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
+        encode_key_with_padding<RowCursor, true, true>(
+                &_primary_keys[i], cursor, _tablet->tablet_schema()->num_key_columns(), true);
+    }
+    return Status::OK();
+}
+
+Status TabletLookupMetric::_lookup_row_key() {
+    SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
+    _row_locations.reserve(_primary_keys.size());
+    // 2. lookup row location
+    Status st;
+    for (size_t i = 0; i < _primary_keys.size(); ++i) {
+        RowLocation location;
+        st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location,
+                                      INT32_MAX /*rethink?*/));
+        if (st.is_not_found()) {
+            continue;

Review Comment:
   I will add it in next PR, since my next PR modified related issue



##########
be/src/runtime/datetime_value.h:
##########
@@ -588,11 +588,13 @@ class DateTimeValue {
 
     template <typename T>
     void convert_from_date_v2(doris::vectorized::DateV2Value<T>* dt) {
+        this->_microsecond = 0;

Review Comment:
   DateTimeValue lost _microsecond in this function, it's an old doris DateTime format so it does not need `_microsecond ` precision, but  in this PR mysql_row_buffer need this precision to keep code compatible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org