You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/11/16 13:59:51 UTC

[incubator-doris] branch master updated: [ComplexType] Restructure storage type to support complex types expending (#4905)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b48c768  [ComplexType] Restructure storage type to support complex types expending (#4905)
b48c768 is described below

commit b48c768dc7e026d006283875271969765ba122e1
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Mon Nov 16 21:59:41 2020 +0800

    [ComplexType] Restructure storage type to support complex types expending (#4905)
    
    This CL includes:
    * Change the column metadata to a tree structure.
    * Refactor the segment_v2.ColumnReader and sgment_v2.ColumnWriter to support complex type.
    * Implements the reading and writing of array type.
---
 be/src/olap/CMakeLists.txt                         |   1 +
 be/src/olap/collection.h                           |  59 ++++
 be/src/olap/column_block.h                         |  52 ++--
 be/src/olap/column_vector.cpp                      | 212 +++++++++++++++
 be/src/olap/column_vector.h                        | 232 ++++++++++++++++
 be/src/olap/field.h                                |  61 ++++-
 be/src/olap/olap_common.h                          |   2 +-
 be/src/olap/row_block2.cpp                         |  26 +-
 be/src/olap/row_block2.h                           |  16 +-
 be/src/olap/rowset/column_reader.cpp               |   2 +-
 be/src/olap/rowset/column_writer.cpp               |   2 +-
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp |  20 +-
 be/src/olap/rowset/segment_v2/binary_dict_page.h   |   4 +-
 .../olap/rowset/segment_v2/bitmap_index_reader.cpp |   9 +-
 be/src/olap/rowset/segment_v2/bitshuffle_page.h    |  13 +-
 .../segment_v2/bloom_filter_index_reader.cpp       |   9 +-
 .../segment_v2/bloom_filter_index_writer.cpp       |   2 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 183 +++++++++++--
 be/src/olap/rowset/segment_v2/column_reader.h      |  66 +++--
 be/src/olap/rowset/segment_v2/column_writer.cpp    | 260 +++++++++++++-----
 be/src/olap/rowset/segment_v2/column_writer.h      | 165 ++++++++---
 be/src/olap/rowset/segment_v2/encoding_info.cpp    |  15 +
 .../rowset/segment_v2/frame_of_reference_page.h    |  13 +-
 be/src/olap/rowset/segment_v2/page_decoder.h       |   9 +
 be/src/olap/rowset/segment_v2/parsed_page.h        |   5 +
 be/src/olap/rowset/segment_v2/plain_page.h         |  15 +-
 be/src/olap/rowset/segment_v2/segment.cpp          |   3 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  47 +++-
 be/src/olap/rowset/segment_v2/segment_writer.h     |   2 +
 be/src/olap/rowset/segment_v2/zone_map_index.cpp   |  13 +-
 be/src/olap/tablet_schema.cpp                      |  29 +-
 be/src/olap/tablet_schema.h                        |  16 ++
 be/src/olap/types.cpp                              | 113 +++++++-
 be/src/olap/types.h                                | 270 ++++++++++++++++--
 be/test/olap/CMakeLists.txt                        |   1 +
 be/test/olap/column_vector_test.cpp                | 180 ++++++++++++
 .../rowset/segment_v2/binary_dict_page_test.cpp    |  16 +-
 .../rowset/segment_v2/binary_plain_page_test.cpp   |  13 +-
 .../rowset/segment_v2/binary_prefix_page_test.cpp  |  22 +-
 .../rowset/segment_v2/bitshuffle_page_test.cpp     |  13 +-
 .../segment_v2/column_reader_writer_test.cpp       | 302 +++++++++++++++++----
 .../olap/rowset/segment_v2/encoding_info_test.cpp  |   4 +-
 .../segment_v2/frame_of_reference_page_test.cpp    |  14 +-
 be/test/olap/rowset/segment_v2/plain_page_test.cpp |  14 +-
 be/test/olap/rowset/segment_v2/rle_page_test.cpp   |  13 +-
 be/test/olap/rowset/segment_v2/segment_test.cpp    |  18 +-
 be/test/olap/schema_change_test.cpp                |   4 +-
 be/test/olap/storage_types_test.cpp                |  79 +++++-
 gensrc/proto/olap_file.proto                       |   2 +-
 gensrc/proto/segment_v2.proto                      |   5 +
 50 files changed, 2268 insertions(+), 378 deletions(-)

diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 3768b5e..4fa49fa 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -117,4 +117,5 @@ add_library(Olap STATIC
     task/engine_publish_version_task.cpp
     task/engine_alter_tablet_task.cpp
     olap_snapshot_converter.cpp
+    column_vector.cpp
 )
diff --git a/be/src/olap/collection.h b/be/src/olap/collection.h
new file mode 100644
index 0000000..719ae1f
--- /dev/null
+++ b/be/src/olap/collection.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iostream>
+
+namespace doris {
+
+// cpp type for ARRAY
+struct Collection {
+    // child column data
+    void* data;
+    uint32_t length;
+    // item has no null value if has_null is false.
+    // item ```may``` has null value if has_null is true.
+    bool has_null;
+    // null bitmap
+    bool* null_signs;
+
+    Collection(): data(nullptr), length(0), has_null(false), null_signs(nullptr) {}
+
+    explicit Collection(uint32_t length) : data(nullptr), length(length), has_null(false), null_signs(nullptr) {}
+
+    Collection(void* data, size_t length) : data(data), length(length), has_null(false), null_signs(nullptr) {}
+
+    Collection(void* data, size_t length, bool* null_signs) : data(data), length(length), has_null(true), null_signs(null_signs) {}
+
+    Collection(void* data, size_t length, bool has_null, bool* null_signs)
+    : data(data), length(length), has_null(has_null), null_signs(null_signs) {}
+
+    bool is_null_at(uint32_t index) {
+        return this->has_null && this->null_signs[index];
+    }
+
+    bool operator==(const Collection& y) const;
+    bool operator!=(const Collection& value) const;
+    bool operator<(const Collection& value) const;
+    bool operator<=(const Collection& value) const;
+    bool operator>(const Collection& value) const;
+    bool operator>=(const Collection& value) const;
+    int32_t cmp(const Collection& other) const;
+};
+
+}  // namespace doris
diff --git a/be/src/olap/column_block.h b/be/src/olap/column_block.h
index d05912a..f4d1254 100644
--- a/be/src/olap/column_block.h
+++ b/be/src/olap/column_block.h
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 
+#include "olap/column_vector.h"
 #include "olap/types.h"
 #include "util/bitmap.h"
 
@@ -26,49 +27,45 @@ namespace doris {
 
 class MemPool;
 class TypeInfo;
-class ColumnBlockCell;
+struct ColumnBlockCell;
 
 // Block of data belong to a single column.
 // It doesn't own any data, user should keep the life of input data.
+// TODO llj Remove this class
 class ColumnBlock {
 public:
-    ColumnBlock(const TypeInfo* type_info, uint8_t* data, uint8_t* null_bitmap,
-                size_t nrows, MemPool* pool)
-        : _type_info(type_info), _data(data), _null_bitmap(null_bitmap),
-          _nrows(nrows), _delete_state(DEL_NOT_SATISFIED), _pool(pool) { }
-
-    const TypeInfo* type_info() const { return _type_info; }
-    uint8_t* data() const { return _data; }
-    uint8_t* null_bitmap() const { return _null_bitmap; }
-    bool is_nullable() const { return _null_bitmap != nullptr; }
+    ColumnBlock(ColumnVectorBatch* batch, MemPool* pool)
+        : _batch(batch), _pool(pool) { }
+
+    const TypeInfo* type_info() const { return _batch->type_info(); }
+    uint8_t* data() const { return _batch->data(); }
+    bool is_nullable() const { return _batch->is_nullable(); }
     MemPool* pool() const { return _pool; }
-    const uint8_t* cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); }
-    uint8_t* mutable_cell_ptr(size_t idx) const { return _data + idx * _type_info->size(); }
+    const uint8_t* cell_ptr(size_t idx) const { return _batch->cell_ptr(idx); }
+    uint8_t* mutable_cell_ptr(size_t idx) const { return _batch->mutable_cell_ptr(idx); }
     bool is_null(size_t idx) const {
-        return is_nullable() && BitmapTest(_null_bitmap, idx);
+        return _batch->is_null_at(idx);
     }
     void set_is_null(size_t idx, bool is_null) const {
-        if (is_nullable()) {
-            BitmapChange(_null_bitmap, idx, is_null);
-        }
+        _batch->set_is_null(idx, is_null);
     }
 
-    ColumnBlockCell cell(size_t idx) const;
+    void set_null_bits(size_t offset, size_t num_rows, bool val) const {
+        _batch->set_null_bits(offset, num_rows, val);
+    }
 
-    size_t nrows() const { return _nrows; }
+    ColumnVectorBatch* vector_batch() const { return _batch; }
+
+    ColumnBlockCell cell(size_t idx) const;
 
     void set_delete_state(DelCondSatisfied delete_state) {
-        _delete_state = delete_state;
+        _batch->set_delete_state(delete_state);
     }
 
-    DelCondSatisfied delete_state() const { return _delete_state; }
+    DelCondSatisfied delete_state() const { return _batch->delete_state(); }
 
 private:
-    const TypeInfo* _type_info;
-    uint8_t* _data;
-    uint8_t* _null_bitmap;
-    size_t _nrows;
-    DelCondSatisfied _delete_state;
+    ColumnVectorBatch* _batch;
     MemPool* _pool;
 };
 
@@ -96,15 +93,16 @@ public:
         : _block(block), _row_offset(row_offset) {
     }
     void advance(size_t skip) { _row_offset += skip; }
-    size_t first_row_index() const { return _row_offset; }
     ColumnBlock* column_block() { return _block; }
     const TypeInfo* type_info() const { return _block->type_info(); }
     MemPool* pool() const { return _block->pool(); }
     void set_null_bits(size_t num_rows, bool val) {
-        BitmapChangeBits(_block->null_bitmap(), _row_offset, num_rows, val);
+        _block->set_null_bits(_row_offset, num_rows, val);
     }
     bool is_nullable() const { return _block->is_nullable(); }
     uint8_t* data() const { return _block->mutable_cell_ptr(_row_offset); }
+    size_t current_offset() { return _row_offset; }
+
 private:
     ColumnBlock* _block;
     size_t _row_offset;
diff --git a/be/src/olap/column_vector.cpp b/be/src/olap/column_vector.cpp
new file mode 100644
index 0000000..66272c9
--- /dev/null
+++ b/be/src/olap/column_vector.cpp
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "column_vector.h"
+#include "olap/field.h"
+
+namespace doris {
+
+ColumnVectorBatch::~ColumnVectorBatch() = default;
+
+Status ColumnVectorBatch::resize(size_t new_cap) {
+    if (_nullable) {
+        _null_signs.resize(new_cap);
+    }
+    _capacity = new_cap;
+    return Status::OK();
+}
+
+Status ColumnVectorBatch::create(size_t init_capacity,
+        bool is_nullable,
+        const TypeInfo* type_info,
+        Field* field,
+        std::unique_ptr<ColumnVectorBatch>* column_vector_batch) {
+    if (is_scalar_type(type_info->type())) {
+        std::unique_ptr<ColumnVectorBatch> local;
+        switch (type_info->type()) {
+            case OLAP_FIELD_TYPE_BOOL:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_BOOL>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_TINYINT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_TINYINT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_SMALLINT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_SMALLINT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_INT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_INT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_UNSIGNED_INT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_UNSIGNED_INT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_BIGINT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_BIGINT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_LARGEINT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_LARGEINT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_FLOAT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_FLOAT>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_DOUBLE:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_DOUBLE>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_DECIMAL:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_DECIMAL>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_DATE:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_DATETIME:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_CHAR:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_CHAR>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_VARCHAR:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_VARCHAR>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_HLL:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_HLL>::CppType>(type_info, is_nullable));
+                break;
+            case OLAP_FIELD_TYPE_OBJECT:
+                local.reset(new ScalarColumnVectorBatch<CppTypeTraits<OLAP_FIELD_TYPE_OBJECT>::CppType>(type_info, is_nullable));
+                break;
+            default:
+                return Status::NotSupported("unsupported type for ColumnVectorBatch: " + std::to_string(type_info->type()));
+        }
+        RETURN_IF_ERROR(local->resize(init_capacity));
+        *column_vector_batch = std::move(local);
+        return Status::OK();
+    } else {
+        switch (type_info->type()) {
+        case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+            if (field == nullptr) {
+                return Status::NotSupported("When create ArrayColumnVectorBatch, `Field` is indispensable");
+            }
+            std::unique_ptr<ColumnVectorBatch> local(new ArrayColumnVectorBatch(type_info, is_nullable, init_capacity, field));
+            RETURN_IF_ERROR(local->resize(init_capacity));
+            *column_vector_batch = std::move(local);
+            return Status::OK();
+        }
+        default:
+            return Status::NotSupported("unsupported type for ColumnVectorBatch: " + std::to_string(type_info->type()));
+        }
+    }
+}
+
+template <class ScalarType>
+ScalarColumnVectorBatch<ScalarType>::ScalarColumnVectorBatch(const TypeInfo* type_info, bool is_nullable)
+: ColumnVectorBatch(type_info, is_nullable), _data(0) {
+}
+
+template <class ScalarType>
+ScalarColumnVectorBatch<ScalarType>::~ScalarColumnVectorBatch() = default;
+
+template <class ScalarType>
+Status ScalarColumnVectorBatch<ScalarType>::resize(size_t new_cap) {
+    if (capacity() < new_cap) { // before first init, _capacity is 0.
+        RETURN_IF_ERROR(ColumnVectorBatch::resize(new_cap));
+        _data.resize(new_cap);
+    }
+    return Status::OK();
+}
+
+ArrayColumnVectorBatch::ArrayColumnVectorBatch(
+        const TypeInfo* type_info,
+        bool is_nullable,
+        size_t init_capacity,
+        Field* field) : ColumnVectorBatch(type_info, is_nullable), _data(0), _item_offsets(1) {
+    auto array_type_info = reinterpret_cast<const ArrayTypeInfo*>(type_info);
+    _item_offsets[0] = 0;
+    ColumnVectorBatch::create(
+            init_capacity * 2,
+            field->get_sub_field(0)->is_nullable(),
+            array_type_info->item_type_info(),
+            field->get_sub_field(0),
+            &_elements
+            );
+}
+
+ArrayColumnVectorBatch::~ArrayColumnVectorBatch() = default;
+
+Status ArrayColumnVectorBatch::resize(size_t new_cap) {
+    if (capacity() < new_cap) {
+        RETURN_IF_ERROR(ColumnVectorBatch::resize(new_cap));
+        _data.resize(new_cap);
+        _item_offsets.resize(new_cap + 1);
+    }
+    return Status::OK();
+}
+
+void ArrayColumnVectorBatch::put_item_ordinal(segment_v2::ordinal_t* ordinals, size_t start_idx, size_t size) {
+    size_t first_offset = _item_offsets[start_idx];
+    segment_v2::ordinal_t first_ordinal = ordinals[0];
+    size_t i = 0;
+    while (++i < size) {
+        _item_offsets[start_idx + i] = first_offset + (ordinals[i] - first_ordinal);
+    }
+}
+
+void ArrayColumnVectorBatch::prepare_for_read(size_t start_idx, size_t end_idx, bool item_has_null) {
+    for (size_t idx = start_idx; idx < end_idx; ++idx) {
+        if (!is_null_at(idx)) {
+            _data[idx] = Collection(
+                    _elements->mutable_cell_ptr(_item_offsets[idx]),
+                    _item_offsets[idx + 1] - _item_offsets[idx],
+                    item_has_null,
+                    _elements->is_nullable() ? const_cast<bool *>(&_elements->null_signs()[_item_offsets[idx]]) : nullptr
+                    );
+        }
+    }
+}
+
+template <class T>
+DataBuffer<T>::DataBuffer(size_t new_size): buf(nullptr), current_size(0), current_capacity(0) {
+    resize(new_size);
+}
+
+template <class T>
+DataBuffer<T>::~DataBuffer() {
+    for(uint64_t i = current_size; i > 0; --i) {
+        (buf + i - 1) -> ~T();
+    }
+    if (buf) {
+        std::free(buf);
+    }
+}
+
+template <class T>
+void DataBuffer<T>::resize(size_t new_size) {
+    if (new_size > current_capacity || !buf) {
+        if (buf) {
+            T* buf_old = buf;
+            buf = reinterpret_cast<T*>(std::malloc(sizeof(T) * new_size));
+            memcpy(buf, buf_old, sizeof(T) * current_size);
+            std::free(buf_old);
+        } else {
+            buf = reinterpret_cast<T*>(std::malloc(sizeof(T) * new_size));
+        }
+        current_capacity = new_size;
+    }
+    current_size = new_size;
+}
+
+} // namespace doris
diff --git a/be/src/olap/column_vector.h b/be/src/olap/column_vector.h
new file mode 100644
index 0000000..0fc41fc
--- /dev/null
+++ b/be/src/olap/column_vector.h
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/segment_v2/common.h" // for ordinal_t
+#include "olap/types.h"
+
+namespace doris {
+
+template <class T>
+class DataBuffer {
+private:
+    T* buf;
+    // current size
+    size_t current_size;
+    // maximal capacity (actual allocated memory)
+    size_t current_capacity;
+
+public:
+    explicit DataBuffer(size_t size = 0);
+    ~DataBuffer();
+    T* data() {
+        return buf;
+    }
+
+    const T* data() const {
+        return buf;
+    }
+
+    size_t size() {
+        return current_size;
+    }
+
+    size_t capacity() {
+        return current_capacity;
+    }
+
+    T& operator[](size_t i) {
+        return buf[i];
+    }
+
+    T& operator[](size_t i) const {
+        return buf[i];
+    }
+
+    void resize(size_t _size);
+};
+
+template class DataBuffer<bool>;
+template class DataBuffer<int8_t>;
+template class DataBuffer<int16_t>;
+template class DataBuffer<int32_t>;
+template class DataBuffer<uint32_t>;
+template class DataBuffer<int64_t>;
+template class DataBuffer<uint64_t>;
+template class DataBuffer<int128_t>;
+template class DataBuffer<float>;
+template class DataBuffer<double>;
+template class DataBuffer<decimal12_t>;
+template class DataBuffer<uint24_t>;
+template class DataBuffer<Slice>;
+template class DataBuffer<Collection>;
+
+// struct that contains column data(null bitmap), data array in sub class.
+class ColumnVectorBatch {
+public:
+    explicit ColumnVectorBatch(const TypeInfo* type_info, bool is_nullable)
+    : _type_info(type_info), _capacity(0), _delete_state(DEL_NOT_SATISFIED), _nullable(is_nullable), _null_signs(0) {}
+
+    virtual ~ColumnVectorBatch();
+
+    const TypeInfo* type_info() const { return _type_info; }
+
+    size_t capacity() const { return _capacity; }
+
+    bool is_nullable() const { return _nullable; }
+
+    bool is_null_at(size_t row_idx) {
+        return _nullable && _null_signs[row_idx];
+    }
+
+    void set_is_null(size_t idx, bool is_null) {
+        if (_nullable) {
+            _null_signs[idx] = is_null;
+        }
+    }
+
+    void set_null_bits(size_t offset, size_t num_rows, bool val) {
+        if (_nullable) {
+            memset(&_null_signs[offset], val, num_rows);
+        }
+    }
+
+    const bool* null_signs() const { return _null_signs.data(); }
+
+    void set_delete_state(DelCondSatisfied delete_state) {
+        _delete_state = delete_state;
+    }
+
+    DelCondSatisfied delete_state() const { return _delete_state; }
+
+    /**
+     * Change the number of slots to at least the given capacity.
+     * This function is not recursive into subtypes.
+     * Tips: This function will change `_capacity` attribute.
+     */
+    virtual Status resize(size_t new_cap);
+
+    // Get the start of the data.
+    virtual uint8_t* data() const = 0;
+
+    // Get the idx's cell_ptr
+    virtual const uint8_t* cell_ptr(size_t idx) const = 0;
+
+    // Get thr idx's cell_ptr for write
+    virtual uint8_t* mutable_cell_ptr(size_t idx) = 0;
+
+
+    static Status create(size_t init_capacity,
+                         bool is_nullable,
+                         const TypeInfo* type_info,
+                         Field* field,
+                         std::unique_ptr<ColumnVectorBatch>* column_vector_batch);
+
+private:
+    const TypeInfo* _type_info;
+    size_t _capacity;
+    DelCondSatisfied _delete_state;
+    const bool _nullable;
+    DataBuffer<bool> _null_signs;
+};
+
+template <class ScalarCppType>
+class ScalarColumnVectorBatch : public ColumnVectorBatch {
+public:
+    explicit ScalarColumnVectorBatch(const TypeInfo* type_info, bool is_nullable);
+
+    ~ScalarColumnVectorBatch() override ;
+
+    Status resize(size_t new_cap) override;
+
+    // Get the start of the data.
+    uint8_t* data() const override { return const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(_data.data())); }
+
+    // Get the idx's cell_ptr
+    const uint8_t* cell_ptr(size_t idx) const override { return reinterpret_cast<uint8_t *>(&_data[idx]); }
+
+    // Get thr idx's cell_ptr for write
+    uint8_t* mutable_cell_ptr(size_t idx) override { return reinterpret_cast<uint8_t *>(&_data[idx]); }
+
+private:
+    DataBuffer<ScalarCppType> _data;
+};
+
+class ArrayColumnVectorBatch : public ColumnVectorBatch {
+public:
+    explicit ArrayColumnVectorBatch(const TypeInfo* type_info, bool is_nullable, size_t init_capacity, Field* field);
+    ~ArrayColumnVectorBatch() override;
+    Status resize(size_t new_cap) override;
+
+    ColumnVectorBatch* elements() const { return _elements.get(); }
+
+    // Get the start of the data.
+    uint8_t* data() const override {
+        return reinterpret_cast<uint8 *>(const_cast<Collection *>(_data.data()));
+    }
+
+    // Get the idx's cell_ptr
+    const uint8_t* cell_ptr(size_t idx) const override {
+        return reinterpret_cast<const uint8*>(&_data[idx]);
+    }
+
+    // Get thr idx's cell_ptr for write
+    uint8_t* mutable_cell_ptr(size_t idx) override {
+        return reinterpret_cast<uint8*>(&_data[idx]);
+    }
+
+    size_t item_offset(size_t idx) const {
+        return _item_offsets[idx];
+    }
+
+    // From `start_idx`, put `size` ordinals to _item_offsets
+    // Ex:
+    // original _item_offsets: 0 3 5 9; ordinals to be added: 100 105 111; size: 3; satart_idx: 3
+    // --> _item_offsets: 0 3 5 (9 + 100 - 100) (9 + 105 - 100) (9 + 111 - 100)
+    // _item_offsets becomes 0 3 5 9 14 20
+    void put_item_ordinal(segment_v2::ordinal_t* ordinals, size_t start_idx, size_t size);
+
+    // Generate collection slots.
+    void prepare_for_read(size_t start_idx, size_t end_idx, bool item_has_null);
+
+private:
+    DataBuffer<Collection> _data;
+
+    std::unique_ptr<ColumnVectorBatch> _elements;
+
+    // Stores each collection's start offsets in _elements.
+    DataBuffer<size_t> _item_offsets;
+};
+
+template class ScalarColumnVectorBatch<bool>;
+template class ScalarColumnVectorBatch<int8_t>;
+template class ScalarColumnVectorBatch<int16_t>;
+template class ScalarColumnVectorBatch<int32_t>;
+template class ScalarColumnVectorBatch<uint32_t>;
+template class ScalarColumnVectorBatch<int64_t>;
+template class ScalarColumnVectorBatch<uint64_t>;
+template class ScalarColumnVectorBatch<int128_t>;
+template class ScalarColumnVectorBatch<float>;
+template class ScalarColumnVectorBatch<double>;
+template class ScalarColumnVectorBatch<decimal12_t>;
+template class ScalarColumnVectorBatch<uint24_t>;
+template class ScalarColumnVectorBatch<Slice>;
+
+} // namespace doris
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 7382722..e15bbfd 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -40,8 +40,9 @@ namespace doris {
 // User can use this class to access or deal with column data in memory.
 class Field {
 public:
+    explicit Field() = default;
     explicit Field(const TabletColumn& column)
-        : _type_info(get_type_info(column.type())),
+        : _type_info(get_type_info(&column)),
         _key_coder(get_key_coder(column.type())),
         _name(column.name()),
         _index_size(column.index_length()),
@@ -96,7 +97,9 @@ public:
     }
 
     virtual Field* clone() const {
-        return new Field(*this);
+        auto* local = new Field();
+        this->clone(local);
+        return local;
     }
 
     // Test if these two cell is equal with each other
@@ -257,6 +260,12 @@ public:
     Status decode_ascending(Slice* encoded_key, uint8_t* cell_ptr, MemPool* pool) const {
         return _key_coder->decode_ascending(encoded_key, _index_size, cell_ptr, pool);
     }
+    void add_sub_field(std::unique_ptr<Field> sub_field) {
+        _sub_fields.emplace_back(std::move(sub_field));
+    }
+    Field* get_sub_field(int i) {
+        return _sub_fields[i].get();
+    }
 private:
     // Field的最大长度,单位为字节,通常等于length, 变长字符串不同
     const TypeInfo* _type_info;
@@ -264,6 +273,7 @@ private:
     std::string _name;
     uint16_t _index_size;
     bool _is_nullable;
+    std::vector<std::unique_ptr<Field>> _sub_fields;
 
 protected:
     const AggregateInfo* _agg_info;
@@ -277,7 +287,20 @@ protected:
         slice->size = _length;
         slice->data = (char*)pool->allocate(slice->size);
         return type_value;
-    };
+    }
+
+    void clone(Field* other) const {
+        other->_type_info = this->_type_info;
+        other->_key_coder = this->_key_coder;
+        other->_name = this->_name;
+        other->_index_size = this->_index_size;
+        other->_is_nullable = this->_is_nullable;
+        other->_sub_fields.clear();
+        for (const auto & f : _sub_fields) {
+            Field* item = f->clone();
+            other->add_sub_field(std::unique_ptr<Field>(item));
+        }
+    }
 };
 
 template<typename LhsCellType, typename RhsCellType>
@@ -364,6 +387,7 @@ uint32_t Field::hash_code(const CellType& cell, uint32_t seed) const {
 
 class CharField: public Field {
 public:
+    explicit CharField() : Field() {}
     explicit CharField(const TabletColumn& column) : Field(column) {
     }
 
@@ -395,7 +419,9 @@ public:
     }
 
     CharField* clone() const override {
-        return new CharField(*this);
+        auto* local = new CharField();
+        Field::clone(local);
+        return local;
     }
 
     char* allocate_value(MemPool* pool) const override {
@@ -411,6 +437,7 @@ public:
 
 class VarcharField: public Field {
 public:
+    explicit VarcharField() :Field() {}
     explicit VarcharField(const TabletColumn& column) : Field(column) {
     }
 
@@ -428,7 +455,9 @@ public:
     }
 
     VarcharField* clone() const override {
-        return new VarcharField(*this);
+        auto* local = new VarcharField();
+        Field::clone(local);
+        return local;
     }
 
     char* allocate_value(MemPool* pool) const override {
@@ -444,6 +473,7 @@ public:
 
 class BitmapAggField: public Field {
 public:
+    explicit BitmapAggField() : Field() {}
     explicit BitmapAggField(const TabletColumn& column) : Field(column) {
     }
 
@@ -459,12 +489,15 @@ public:
     }
 
     BitmapAggField* clone() const override {
-        return new BitmapAggField(*this);
+        auto* local = new BitmapAggField();
+        Field::clone(local);
+        return local;
     }
 };
 
 class HllAggField: public Field {
 public:
+    explicit HllAggField() : Field() {}
     explicit HllAggField(const TabletColumn& column) : Field(column) {
     }
 
@@ -480,7 +513,9 @@ public:
     }
 
     HllAggField* clone() const override {
-        return new HllAggField(*this);
+        auto* local = new HllAggField();
+        Field::clone(local);
+        return local;
     }
 };
 
@@ -494,6 +529,12 @@ public:
                     return new CharField(column);
                 case OLAP_FIELD_TYPE_VARCHAR:
                     return new VarcharField(column);
+                case OLAP_FIELD_TYPE_ARRAY: {
+                    std::unique_ptr<Field> item_field(FieldFactory::create(column.get_sub_column(0)));
+                    auto* local = new Field(column);
+                    local->add_sub_field(std::move(item_field));
+                    return local;
+                }
                 default:
                     return new Field(column);
             }
@@ -512,6 +553,12 @@ public:
                         return new CharField(column);
                     case OLAP_FIELD_TYPE_VARCHAR:
                         return new VarcharField(column);
+                    case OLAP_FIELD_TYPE_ARRAY: {
+                        std::unique_ptr<Field> item_field(FieldFactory::create(column.get_sub_column(0)));
+                        auto* local = new Field(column);
+                        local->add_sub_field(std::move(item_field));
+                        return local;
+                    }
                     default:
                         return new Field(column);
                 }
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index e1b0bf3..86b5d2f 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -132,7 +132,7 @@ enum FieldType {
     OLAP_FIELD_TYPE_VARCHAR = 17,
 
     OLAP_FIELD_TYPE_STRUCT = 18,        // Struct
-    OLAP_FIELD_TYPE_LIST = 19,          // LIST
+    OLAP_FIELD_TYPE_ARRAY = 19,          // ARRAY
     OLAP_FIELD_TYPE_MAP = 20,           // Map
     OLAP_FIELD_TYPE_UNKNOWN = 21,       // UNKNOW Type
     OLAP_FIELD_TYPE_NONE = 22,
diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp
index c70477d..c1f8918 100644
--- a/be/src/olap/row_block2.cpp
+++ b/be/src/olap/row_block2.cpp
@@ -29,18 +29,20 @@ namespace doris {
 RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity)
         : _schema(schema),
           _capacity(capacity),
-          _column_datas(_schema.num_columns(), nullptr),
-          _column_null_bitmaps(_schema.num_columns(), nullptr),
+          _column_vector_batches(_schema.num_columns()),
           _tracker(new MemTracker(-1, "RowBlockV2")),
           _pool(new MemPool(_tracker.get())),
           _selection_vector(nullptr) {
-    auto bitmap_size = BitmapSize(capacity);
     for (auto cid : _schema.column_ids()) {
-        size_t data_size = _schema.column(cid)->type_info()->size() * _capacity;
-        _column_datas[cid] = new uint8_t[data_size];
-
-        if (_schema.column(cid)->is_nullable()) {
-            _column_null_bitmaps[cid] = new uint8_t[bitmap_size];
+        Status status = ColumnVectorBatch::create(
+                _capacity,
+                _schema.column(cid)->is_nullable(),
+                _schema.column(cid)->type_info(),
+                const_cast<Field *>(_schema.column(cid)),
+                &_column_vector_batches[cid]);
+        if (!status.ok()) {
+            LOG(ERROR) << "failed to create ColumnVectorBatch for type: " << _schema.column(cid)->type();
+            return;
         }
     }
     _selection_vector = new uint16_t[_capacity];
@@ -48,12 +50,6 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity)
 }
 
 RowBlockV2::~RowBlockV2() {
-    for (auto data : _column_datas) {
-        delete[] data;
-    }
-    for (auto null_bitmap : _column_null_bitmaps) {
-        delete[] null_bitmap;
-    }
     delete[] _selection_vector;
 }
 
@@ -64,7 +60,7 @@ Status RowBlockV2::convert_to_row_block(RowCursor* helper, RowBlock* dst) {
             for (uint16_t i = 0; i < _selected_size; ++i) {
                 uint16_t row_idx = _selection_vector[i];
                 dst->get_row(i, helper);
-                bool is_null = BitmapTest(_column_null_bitmaps[cid], row_idx);
+                bool is_null = _column_vector_batches[cid]->is_null_at(row_idx);
                 if (is_null) {
                     helper->set_null(cid);
                 } else {
diff --git a/be/src/olap/row_block2.h b/be/src/olap/row_block2.h
index 374c00f..8ab4507 100644
--- a/be/src/olap/row_block2.h
+++ b/be/src/olap/row_block2.h
@@ -74,10 +74,8 @@ public:
     // low-level API to access memory for each column block(including data array and nullmap).
     // `cid` must be one of `schema()->column_ids()`.
     ColumnBlock column_block(ColumnId cid) const {
-        const TypeInfo* type_info = _schema.column(cid)->type_info();
-        uint8_t* data = _column_datas[cid];
-        uint8_t* null_bitmap = _column_null_bitmaps[cid];
-        return ColumnBlock(type_info, data, null_bitmap, _capacity, _pool.get());
+        ColumnVectorBatch* batch = _column_vector_batches[cid].get();
+        return {batch, _pool.get()};
     }
 
     // low-level API to access the underlying memory for row at `row_idx`.
@@ -113,14 +111,10 @@ public:
 private:
     Schema _schema;
     size_t _capacity;
-    // keeps fixed-size (field_size x capacity) data vector for each column,
-    // _column_datas[cid] == null if cid is not in `_schema`.
+    // _column_vector_batches[cid] == null if cid is not in `_schema`.
     // memory are not allocated from `_pool` because we don't wan't to reallocate them in clear()
-    std::vector<uint8_t*> _column_datas;
-    // keeps null bitmap for each column,
-    // _column_null_bitmaps[cid] == null if cid is not in `_schema` or the column is not null.
-    // memory are not allocated from `_pool` because we don't wan't to reallocate them in clear()
-    std::vector<uint8_t*> _column_null_bitmaps;
+    std::vector<std::unique_ptr<ColumnVectorBatch>> _column_vector_batches;
+
     size_t _num_rows;
     // manages the memory for slice's data
     std::shared_ptr<MemTracker> _tracker;
diff --git a/be/src/olap/rowset/column_reader.cpp b/be/src/olap/rowset/column_reader.cpp
index 4921869..20df8bb 100644
--- a/be/src/olap/rowset/column_reader.cpp
+++ b/be/src/olap/rowset/column_reader.cpp
@@ -675,7 +675,7 @@ ColumnReader* ColumnReader::create(uint32_t column_id,
     }
 
     case OLAP_FIELD_TYPE_STRUCT:
-    case OLAP_FIELD_TYPE_LIST:
+    case OLAP_FIELD_TYPE_ARRAY:
     case OLAP_FIELD_TYPE_MAP:
     default: {
         LOG(WARNING) << "unsupported field type. field=" << column.name()
diff --git a/be/src/olap/rowset/column_writer.cpp b/be/src/olap/rowset/column_writer.cpp
index d2a13e7..52d6e3e 100755
--- a/be/src/olap/rowset/column_writer.cpp
+++ b/be/src/olap/rowset/column_writer.cpp
@@ -119,7 +119,7 @@ ColumnWriter* ColumnWriter::create(uint32_t column_id,
         break;
     }
     case OLAP_FIELD_TYPE_STRUCT:
-    case OLAP_FIELD_TYPE_LIST:
+    case OLAP_FIELD_TYPE_ARRAY:
     case OLAP_FIELD_TYPE_MAP:
     default: {
         LOG(WARNING) << "Unsupported field type. field=" << column.name()
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index ad0efe0..a12eb34 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -197,6 +197,16 @@ Status BinaryDictPageDecoder::init() {
     _encoding_type = static_cast<EncodingTypePB>(type);
     _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE);
     if (_encoding_type == DICT_ENCODING) {
+        // copy the codewords into a temporary buffer first
+        // And then copy the strings corresponding to the codewords to the destination buffer
+        TypeInfo* type_info = get_scalar_type_info(OLAP_FIELD_TYPE_INT);
+
+        RETURN_IF_ERROR(ColumnVectorBatch::create(
+                0,
+                false,
+                type_info,
+                nullptr,
+                &_batch));
         _data_page_decoder.reset(new BitShufflePageDecoder<OLAP_FIELD_TYPE_INT>(_data, _options));
     } else if (_encoding_type == PLAIN_ENCODING) {
         DCHECK_EQ(_encoding_type, PLAIN_ENCODING);
@@ -235,17 +245,13 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
         return Status::OK();
     }
     Slice* out = reinterpret_cast<Slice*>(dst->data());
-    _code_buf.resize((*n) * sizeof(int32_t));
+    _batch->resize(*n);
 
-    // copy the codewords into a temporary buffer first
-    // And then copy the strings corresponding to the codewords to the destination buffer
-    TypeInfo *type_info = get_type_info(OLAP_FIELD_TYPE_INT);
-    // the data in page is not null
-    ColumnBlock column_block(type_info, _code_buf.data(), nullptr, *n, dst->column_block()->pool());
+    ColumnBlock column_block(_batch.get(), dst->column_block()->pool());
     ColumnBlockView tmp_block_view(&column_block);
     RETURN_IF_ERROR(_data_page_decoder->next_batch(n, &tmp_block_view));
     for (int i = 0; i < *n; ++i) {
-        int32_t codeword = *reinterpret_cast<int32_t*>(&_code_buf[i * sizeof(int32_t)]);
+        int32_t codeword = *reinterpret_cast<const int32_t*>(column_block.cell_ptr(i));
         // get the string from the dict decoder
         Slice element = _dict_decoder->string_at_index(codeword);
         if (element.size > 0) {
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.h b/be/src/olap/rowset/segment_v2/binary_dict_page.h
index cb3b5e2..8b71691 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.h
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.h
@@ -28,6 +28,7 @@
 #include "olap/rowset/segment_v2/options.h"
 #include "olap/rowset/segment_v2/common.h"
 #include "olap/column_block.h"
+#include "olap/column_vector.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
 #include "gen_cpp/segment_v2.pb.h"
@@ -126,7 +127,8 @@ private:
     const BinaryPlainPageDecoder* _dict_decoder = nullptr;
     bool _parsed;
     EncodingTypePB _encoding_type;
-    faststring _code_buf;
+    // use as data buf.
+    std::unique_ptr<ColumnVectorBatch> _batch;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
index 0a66511..392994f 100644
--- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
@@ -48,17 +48,18 @@ Status BitmapIndexIterator::seek_dictionary(const void* value, bool* exact_match
 Status BitmapIndexIterator::read_bitmap(rowid_t ordinal, Roaring* result) {
     DCHECK(0 <= ordinal && ordinal < _reader->bitmap_nums());
 
-    Slice value;
-    uint8_t nullmap;
     size_t num_to_read = 1;
-    ColumnBlock block(_reader->type_info(), (uint8_t*) &value, &nullmap, num_to_read, _pool.get());
+    std::unique_ptr<ColumnVectorBatch> cvb;
+    RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb));
+    ColumnBlock block(cvb.get(), _pool.get());
     ColumnBlockView column_block_view(&block);
 
     RETURN_IF_ERROR(_bitmap_column_iter.seek_to_ordinal(ordinal));
     size_t num_read = num_to_read;
     RETURN_IF_ERROR(_bitmap_column_iter.next_batch(&num_read, &column_block_view));
     DCHECK(num_to_read == num_read);
-    *result = Roaring::read(value.data, false);
+
+    *result = Roaring::read(reinterpret_cast<const Slice*>(block.data())->data, false);
     _pool->clear();
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
index 233fc21..9f886e7 100644
--- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h
+++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
@@ -334,6 +334,11 @@ public:
     }
 
     Status next_batch(size_t* n, ColumnBlockView* dst) override {
+        return next_batch<true>(n, dst);
+    }
+
+    template<bool forward_index>
+    inline Status next_batch(size_t* n, ColumnBlockView* dst) {
         DCHECK(_parsed);
         if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) {
             *n = 0;
@@ -343,11 +348,17 @@ public:
         size_t max_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index));
         _copy_next_values(max_fetch, dst->data());
         *n = max_fetch;
-        _cur_index += max_fetch;
+        if (forward_index) {
+            _cur_index += max_fetch;
+        }
 
         return Status::OK();
     }
 
+    Status peek_next_batch(size_t *n, ColumnBlockView* dst) override {
+        return next_batch<false>(n, dst);
+    }
+
     size_t count() const override {
         return _num_elements;
     }
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
index 1cb5418..8cd354b 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
@@ -37,10 +37,10 @@ Status BloomFilterIndexReader::new_iterator(std::unique_ptr<BloomFilterIndexIter
 }
 
 Status BloomFilterIndexIterator::read_bloom_filter(rowid_t ordinal, std::unique_ptr<BloomFilter>* bf) {
-    Slice value;
-    uint8_t nullmap = 0;
     size_t num_to_read = 1;
-    ColumnBlock block(_reader->type_info(), (uint8_t*)&value, &nullmap, num_to_read, _pool.get());
+    std::unique_ptr<ColumnVectorBatch> cvb;
+    RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb));
+    ColumnBlock block(cvb.get(), _pool.get());
     ColumnBlockView column_block_view(&block);
 
     RETURN_IF_ERROR(_bloom_filter_iter.seek_to_ordinal(ordinal));
@@ -49,7 +49,8 @@ Status BloomFilterIndexIterator::read_bloom_filter(rowid_t ordinal, std::unique_
     DCHECK(num_to_read == num_read);
     // construct bloom filter
     BloomFilter::create(_reader->_bloom_filter_index_meta->algorithm(), bf);
-    RETURN_IF_ERROR((*bf)->init(value.data, value.size, _reader->_bloom_filter_index_meta->hash_strategy()));
+    const Slice* value_ptr = reinterpret_cast<const Slice*>(block.data());
+    RETURN_IF_ERROR((*bf)->init(value_ptr->data, value_ptr->size, _reader->_bloom_filter_index_meta->hash_strategy()));
     _pool->clear();
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
index 0044622..f13268f 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
@@ -134,7 +134,7 @@ public:
         meta->set_algorithm(BLOCK_BLOOM_FILTER);
 
         // write bloom filters
-        const TypeInfo* bf_typeinfo = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
+        const TypeInfo* bf_typeinfo = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR);
         IndexedColumnWriterOptions options;
         options.write_ordinal_index = true;
         options.write_value_index = false;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index fd5015c..7da6150 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -41,24 +41,50 @@ Status ColumnReader::create(const ColumnReaderOptions& opts,
                             uint64_t num_rows,
                             const std::string& file_name,
                             std::unique_ptr<ColumnReader>* reader) {
-    std::unique_ptr<ColumnReader> reader_local(
-        new ColumnReader(opts, meta, num_rows, file_name));
-    RETURN_IF_ERROR(reader_local->init());
-    *reader = std::move(reader_local);
-    return Status::OK();
+    if (is_scalar_type((FieldType)meta.type())) {
+        std::unique_ptr<ColumnReader> reader_local(
+                new ColumnReader(opts, meta, num_rows, file_name));
+        RETURN_IF_ERROR(reader_local->init());
+        *reader = std::move(reader_local);
+        return Status::OK();
+    } else {
+        auto type = (FieldType)meta.type();
+        switch(type) {
+            case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+                std::unique_ptr<ColumnReader> item_reader;
+                DCHECK(meta.children_columns_size() == 1);
+                RETURN_IF_ERROR(ColumnReader::create(opts,
+                                                     meta.children_columns(0),
+                                                     meta.children_columns(0).num_rows(),
+                                                     file_name,
+                                                     &item_reader));
+                RETURN_IF_ERROR(item_reader->init());
+
+                std::unique_ptr<ColumnReader> array_reader(
+                        new ColumnReader(opts, meta, num_rows, file_name));
+                RETURN_IF_ERROR(array_reader->init());
+                array_reader->_sub_readers.resize(1);
+                array_reader->_sub_readers[0] = std::move(item_reader);
+                *reader = std::move(array_reader);
+                return Status::OK();
+            }
+            default:
+                return Status::NotSupported("unsupported type for ColumnReader: " + std::to_string(type));
+        }
+    }
 }
 
 ColumnReader::ColumnReader(const ColumnReaderOptions& opts,
                            const ColumnMetaPB& meta,
                            uint64_t num_rows,
                            const std::string& file_name)
-        : _opts(opts), _meta(meta), _num_rows(num_rows), _file_name(file_name) {
+        :_meta(meta), _opts(opts), _num_rows(num_rows), _file_name(file_name) {
 }
 
 ColumnReader::~ColumnReader() = default;
 
 Status ColumnReader::init() {
-    _type_info = get_type_info((FieldType)_meta.type());
+    _type_info = get_type_info(&_meta);
     if (_type_info == nullptr) {
         return Status::NotSupported(Substitute("unsupported typeinfo, type=$0", _meta.type()));
     }
@@ -92,11 +118,6 @@ Status ColumnReader::init() {
     return Status::OK();
 }
 
-Status ColumnReader::new_iterator(ColumnIterator** iterator) {
-    *iterator = new FileColumnIterator(this);
-    return Status::OK();
-}
-
 Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
     RETURN_IF_ERROR(_ensure_index_loaded());
     RETURN_IF_ERROR(_bitmap_index->new_iterator(iterator));
@@ -296,6 +317,115 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat
     return Status::OK();
 }
 
+Status ColumnReader::new_iterator(ColumnIterator** iterator) {
+    if (is_scalar_type((FieldType)_meta.type())) {
+        *iterator = new FileColumnIterator(this);
+        return Status::OK();
+    } else {
+        auto type = (FieldType)_meta.type();
+        switch(type) {
+            case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+                ColumnIterator* item_iterator;
+                RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&item_iterator));
+                FileColumnIterator* offset_iterator = new FileColumnIterator(this);
+                *iterator = new ArrayFileColumnIterator(offset_iterator, item_iterator);
+                return Status::OK();
+            }
+            default:
+                return Status::NotSupported("unsupported type to create iterator: " + std::to_string(type));
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+ArrayFileColumnIterator::ArrayFileColumnIterator(FileColumnIterator* offset_reader, ColumnIterator* item_iterator) {
+    _offset_iterator.reset(offset_reader);
+    _item_iterator.reset(item_iterator);
+}
+
+Status ArrayFileColumnIterator::init(const ColumnIteratorOptions& opts) {
+    RETURN_IF_ERROR(_offset_iterator->init(opts));
+    RETURN_IF_ERROR(_item_iterator->init(opts));
+    TypeInfo* bigint_type_info = get_scalar_type_info(FieldType::OLAP_FIELD_TYPE_BIGINT);
+    RETURN_IF_ERROR(ColumnVectorBatch::create(1024, _offset_iterator->is_nullable(), bigint_type_info, nullptr, &_offset_batch));
+    return Status::OK();
+}
+
+// every invoke this method, _offset_batch will be cover, so this method is not thread safe.
+Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) {
+    // 1. read n offsets into  _offset_batch;
+    _offset_batch->resize(*n + 1);
+    ColumnBlock ordinal_block(_offset_batch.get(), nullptr);
+    ColumnBlockView ordinal_view(&ordinal_block);
+    RETURN_IF_ERROR(_offset_iterator->next_batch(n, &ordinal_view, has_null));
+
+    if (*n == 0) {
+        return Status::OK();
+    }
+
+    // 2. Because we should read n + 1 offsets, so read one more here.
+    PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder;
+    if (offset_page_decoder->has_remaining()) { // not _page->has_remaining()
+        size_t i = 1;
+        offset_page_decoder->peek_next_batch(&i, &ordinal_view); // not null
+        DCHECK(i == 1);
+    } else {
+        *(reinterpret_cast<ordinal_t*>(ordinal_view.data())) =
+                _offset_iterator->get_current_page()->next_array_item_ordinal;
+    }
+    ordinal_view.set_null_bits(1, false);
+    ordinal_view.advance(1);
+
+    // 3. For nullable data,fill null ordinals from last to start: 0 N N 3 N 5 -> 0 3 3 3 5 5
+    if (_offset_iterator->is_nullable()) {
+        size_t j = *n + 1;
+        while(--j > 0) { // j can not be less than 0
+            ColumnBlockCell cell = ordinal_block.cell(j - 1);
+            if (cell.is_null()) {
+                ordinal_t pre = *(reinterpret_cast<ordinal_t*>(ordinal_block.cell(j).mutable_cell_ptr()));
+                *(reinterpret_cast<ordinal_t*>(cell.mutable_cell_ptr())) = pre;
+            }
+        }
+    }
+
+    // 4. read child column's data and generate collections.
+    ColumnBlock* collection_block = dst->column_block();
+    auto* collection_batch =
+            reinterpret_cast<ArrayColumnVectorBatch*>(collection_block->vector_batch());
+    size_t start_offset = dst->current_offset();
+    size_t end_offset = start_offset + *n;
+    auto* ordinals = reinterpret_cast<ordinal_t *>(ordinal_block.data());
+    collection_batch->put_item_ordinal(ordinals, start_offset, *n + 1);
+
+    size_t size_to_read = ordinals[*n] - ordinals[0];
+    bool item_has_null = false;
+    if (size_to_read > 0) {
+        _item_iterator->seek_to_ordinal(ordinals[0]);
+        ColumnVectorBatch* item_vector_batch = collection_batch->elements();
+        RETURN_IF_ERROR(item_vector_batch->resize(collection_batch->item_offset(end_offset)));
+        ColumnBlock item_block = ColumnBlock(item_vector_batch, dst->pool());
+        ColumnBlockView item_view = ColumnBlockView(&item_block, collection_batch->item_offset(start_offset));
+        size_t real_read = size_to_read;
+        RETURN_IF_ERROR(_item_iterator->next_batch(&real_read, &item_view, &item_has_null));
+        DCHECK(size_to_read == real_read);
+    }
+
+    if (dst->is_nullable()) {
+        bool* collection_nulls = const_cast<bool *>(&collection_batch->null_signs()[dst->current_offset()]);
+        memcpy(collection_nulls, ordinal_block.vector_batch()->null_signs(), sizeof(bool) * *n);
+        dst->advance(*n);
+    } else {
+        dst->set_null_bits(*n, false);
+        dst->advance(*n);
+    }
+
+    collection_batch->prepare_for_read(0, end_offset, item_has_null);
+    return Status::OK();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {
 }
 
@@ -349,8 +479,9 @@ void FileColumnIterator::_seek_to_pos_in_page(ParsedPage* page, ordinal_t offset
     page->offset_in_page = offset_in_page;
 }
 
-Status FileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst) {
+Status FileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) {
     size_t remaining = *n;
+    *has_null = false;
     while (remaining > 0) {
         if (!_page->has_remaining()) {
             bool eos = false;
@@ -385,6 +516,8 @@ Status FileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst) {
                 if (!is_null) {
                     RETURN_IF_ERROR(_page->data_decoder->next_batch(&num_rows, dst));
                     DCHECK_EQ(this_run, num_rows);
+                } else {
+                    *has_null = true;
                 }
 
                 // set null bits
@@ -493,27 +626,29 @@ Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
             DCHECK(_is_nullable);
             _is_default_value_null = true;
         } else {
-            TypeInfo* type_info = get_type_info(_type);
-            _type_size = type_info->size();
+            _type_size = _type_info->size();
             _mem_value = reinterpret_cast<void*>(_pool->allocate(_type_size));
             OLAPStatus s = OLAP_SUCCESS;
-            if (_type == OLAP_FIELD_TYPE_CHAR) {
+            if (_type_info->type() == OLAP_FIELD_TYPE_CHAR) {
                 int32_t length = _schema_length;
                 char* string_buffer = reinterpret_cast<char*>(_pool->allocate(length));
                 memset(string_buffer, 0, length);
                 memory_copy(string_buffer, _default_value.c_str(), _default_value.length());
                 ((Slice*)_mem_value)->size = length;
                 ((Slice*)_mem_value)->data = string_buffer;
-            } else if ( _type == OLAP_FIELD_TYPE_VARCHAR ||
-                _type == OLAP_FIELD_TYPE_HLL ||
-                _type == OLAP_FIELD_TYPE_OBJECT ) {
+            } else if (_type_info->type() == OLAP_FIELD_TYPE_VARCHAR ||
+                _type_info->type() == OLAP_FIELD_TYPE_HLL ||
+                _type_info->type() == OLAP_FIELD_TYPE_OBJECT ) {
                 int32_t length = _default_value.length();
                 char* string_buffer = reinterpret_cast<char*>(_pool->allocate(length));
                 memory_copy(string_buffer, _default_value.c_str(), length);
                 ((Slice*)_mem_value)->size = length;
                 ((Slice*)_mem_value)->data = string_buffer;
+            } else if (_type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
+                // TODO llj for Array default value
+                return Status::NotSupported("Array default type is unsupported");
             } else {
-                s = type_info->from_string(_mem_value, _default_value);
+                s = _type_info->from_string(_mem_value, _default_value);
             }
             if (s != OLAP_SUCCESS) {
                 return Status::InternalError(
@@ -529,14 +664,16 @@ Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
     return Status::OK();
 }
 
-Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst) {
+Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) {
     if (dst->is_nullable()) {
         dst->set_null_bits(*n, _is_default_value_null);
     }
 
     if (_is_default_value_null) {
+        *has_null = true;
         dst->advance(*n);
     } else {
+        *has_null = false;
         for (int i = 0; i < *n; ++i) {
             memcpy(dst->data(), _mem_value, _type_size);
             dst->advance(1);
@@ -545,5 +682,5 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst) {
     return Status::OK();
 }
 
-}
-}
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 39ae159..4068253 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -106,7 +106,6 @@ public:
     bool is_nullable() const { return _meta.is_nullable(); }
 
     const EncodingInfo* encoding_info() const { return _encoding_info; }
-    const TypeInfo* type_info() const { return _type_info; }
 
     bool has_zone_map() const { return _zone_map_index_meta != nullptr; }
     bool has_bitmap_index() const { return _bitmap_index_meta != nullptr; }
@@ -137,6 +136,7 @@ private:
                  const std::string& file_name);
     Status init();
 
+
     // Read and load necessary column indexes into memory if it hasn't been loaded.
     // May be called multiple times, subsequent calls will no op.
     Status _ensure_index_loaded() {
@@ -172,15 +172,15 @@ private:
     Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges);
 
 private:
-    ColumnReaderOptions _opts;
     ColumnMetaPB _meta;
+    ColumnReaderOptions _opts;
     uint64_t _num_rows;
     std::string _file_name;
 
-    // initialized in init()
-    const TypeInfo* _type_info = nullptr;
-    const EncodingInfo* _encoding_info = nullptr;
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
+    const EncodingInfo* _encoding_info = nullptr; // initialized in init(), used for create PageDecoder
+    const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+
     // meta for various column indexes (null if the index is absent)
     const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
     const OrdinalIndexPB* _ordinal_index_meta = nullptr;
@@ -192,13 +192,15 @@ private:
     std::unique_ptr<OrdinalIndexReader> _ordinal_index;
     std::unique_ptr<BitmapIndexReader> _bitmap_index;
     std::unique_ptr<BloomFilterIndexReader> _bloom_filter_index;
+
+    std::vector<std::unique_ptr<ColumnReader>> _sub_readers;
 };
 
 // Base iterator to read one column data
 class ColumnIterator {
 public:
-    ColumnIterator() { }
-    virtual ~ColumnIterator() { }
+    ColumnIterator() = default;
+    virtual ~ColumnIterator() = default;
 
     virtual Status init(const ColumnIteratorOptions& opts) {
         _opts = opts;
@@ -214,10 +216,15 @@ public:
     // then returns false.
     virtual Status seek_to_ordinal(ordinal_t ord) = 0;
 
+    Status next_batch(size_t* n, ColumnBlockView* dst) {
+        bool has_null;
+        return next_batch(n, dst, &has_null);
+    }
+
     // After one seek, we can call this function many times to read data
     // into ColumnBlockView. when read string type data, memory will allocated
     // from MemPool
-    virtual Status next_batch(size_t* n, ColumnBlockView* dst) = 0;
+    virtual Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) = 0;
 
     virtual ordinal_t get_current_ordinal() const = 0;
 
@@ -252,16 +259,17 @@ protected:
 };
 
 // This iterator is used to read column data from file
-class FileColumnIterator : public ColumnIterator {
+// for scalar type
+class FileColumnIterator final : public ColumnIterator {
 public:
-    FileColumnIterator(ColumnReader* reader);
+    explicit FileColumnIterator(ColumnReader* reader);
     ~FileColumnIterator() override;
 
     Status seek_to_first() override;
 
     Status seek_to_ordinal(ordinal_t ord) override;
 
-    Status next_batch(size_t* n, ColumnBlockView* dst) override;
+    Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) override;
 
     ordinal_t get_current_ordinal() const override { return _current_ordinal; }
 
@@ -274,6 +282,10 @@ public:
 
     Status get_row_ranges_by_bloom_filter(CondColumn* cond_column, RowRanges* row_ranges) override;
 
+    ParsedPage* get_current_page() { return _page.get(); }
+
+    bool is_nullable() { return _reader->is_nullable(); }
+
 private:
     void _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page);
     Status _load_next_page(bool* eos);
@@ -305,15 +317,37 @@ private:
     std::unordered_set<uint32_t> _delete_partial_satisfied_pages;
 };
 
+class ArrayFileColumnIterator final : public ColumnIterator {
+public:
+    explicit ArrayFileColumnIterator(FileColumnIterator* offset_iterator, ColumnIterator* item_iterator);
+
+    ~ArrayFileColumnIterator() override = default;
+
+    Status init(const ColumnIteratorOptions& opts) override;
+
+    Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) override;
+
+    Status seek_to_first() override { return _offset_iterator->seek_to_first(); };
+
+    Status seek_to_ordinal(ordinal_t ord) override { return _offset_iterator->seek_to_ordinal(ord); };
+
+    ordinal_t get_current_ordinal() const override { return _offset_iterator->get_current_ordinal(); }
+
+private:
+    std::unique_ptr<FileColumnIterator> _offset_iterator;
+    std::unique_ptr<ColumnIterator> _item_iterator;
+    std::unique_ptr<ColumnVectorBatch> _offset_batch;
+};
+
 // This iterator is used to read default value column
 class DefaultValueColumnIterator : public ColumnIterator {
 public:
     DefaultValueColumnIterator(bool has_default_value, const std::string& default_value,
-                               bool is_nullable, FieldType type, size_t schema_length)
+                               bool is_nullable, TypeInfo* type_info, size_t schema_length)
             : _has_default_value(has_default_value),
               _default_value(default_value),
               _is_nullable(is_nullable),
-              _type(type),
+              _type_info(type_info),
               _schema_length(schema_length),
               _is_default_value_null(false),
               _type_size(0),
@@ -332,7 +366,7 @@ public:
         return Status::OK();
     }
 
-    Status next_batch(size_t* n, ColumnBlockView* dst) override;
+    Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) override;
 
     ordinal_t get_current_ordinal() const override { return _current_rowid; }
 
@@ -340,7 +374,7 @@ private:
     bool _has_default_value;
     std::string _default_value;
     bool _is_nullable;
-    FieldType _type;
+    TypeInfo* _type_info;
     size_t _schema_length;
     bool _is_default_value_null;
     size_t _type_size;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 1ba59bf..cb7420f 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -77,13 +77,87 @@ private:
     RleEncoder<bool> _rle_encoder;
 };
 
-ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts,
+Status ColumnWriter::create(const ColumnWriterOptions& opts,
+                            const TabletColumn* column,
+                            fs::WritableBlock* _wblock,
+                            std::unique_ptr<ColumnWriter>* writer) {
+    std::unique_ptr<Field> field(FieldFactory::create(*column));
+    DCHECK(field.get() != nullptr);
+    if (is_scalar_type(column->type())) {
+        std::unique_ptr<ColumnWriter> writer_local =
+                std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(opts, std::move(field), _wblock));
+        *writer = std::move(writer_local);
+        return Status::OK();
+    } else {
+        switch(column->type()) {
+            case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+                DCHECK(column->get_subtype_count() == 1);
+                const TabletColumn& item_column = column->get_sub_column(0);
+
+                ColumnWriterOptions item_options;
+                item_options.meta = opts.meta->mutable_children_columns(0);
+                item_options.need_zone_map = false;
+                item_options.need_bloom_filter = item_column.is_bf_column();
+                item_options.need_bitmap_index = item_column.has_bitmap_index();
+                if (item_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+                    if (item_options.need_bloom_filter) {
+                        return Status::NotSupported("Do not support bloom filter for array type" );
+                    }
+                    if (item_options.need_bitmap_index) {
+                        return Status::NotSupported("Do not support bitmap index for array type" );
+                    }
+                }
+
+                std::unique_ptr<ColumnWriter> item_writer;
+                RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, _wblock, &item_writer));
+
+                std::unique_ptr<Field> bigint_field(FieldFactory::create_by_type(FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT));
+
+                ScalarColumnWriter* offset_writer = new ScalarColumnWriter(opts, std::move(bigint_field), _wblock);
+
+                std::unique_ptr<ColumnWriter> writer_local =
+                        std::unique_ptr<ColumnWriter>(
+                                new ArrayColumnWriter(opts,
+                                                         std::move(field),
+                                                         offset_writer,
+                                                         std::move(item_writer)));
+                *writer = std::move(writer_local);
+                return Status::OK();
+            }
+            default:
+                return Status::NotSupported("unsupported type for ColumnWriter: " + std::to_string(field->type()));
+        }
+    }
+}
+
+Status ColumnWriter::append_nullable(
+        const uint8_t* is_null_bits, const void* data, size_t num_rows) {
+    const uint8_t* ptr = (const uint8_t*)data;
+    BitmapIterator null_iter(is_null_bits, num_rows);
+    bool is_null = false;
+    size_t this_run = 0;
+    while ((this_run = null_iter.Next(&is_null)) > 0) {
+        if (is_null) {
+            RETURN_IF_ERROR(append_nulls(this_run));
+        } else {
+            RETURN_IF_ERROR(append_data(&ptr, this_run));
+        }
+    }
+    return Status::OK();
+}
+
+Status ColumnWriter::append_not_nulls(const void* data, size_t num_rows) {
+    return append_data((const uint8_t**)&data, num_rows);
+}
+
+///////////////////////////////////////////////////////////////////////////////////
+
+ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts,
                            std::unique_ptr<Field> field,
                            fs::WritableBlock* wblock) :
+        ColumnWriter(std::move(field), opts.meta->is_nullable()),
         _opts(opts),
-        _field(std::move(field)),
         _wblock(wblock),
-        _is_nullable(_opts.meta->is_nullable()),
         _data_size(0) {
     // these opts.meta fields should be set by client
     DCHECK(opts.meta->has_column_id());
@@ -96,7 +170,7 @@ ColumnWriter::ColumnWriter(const ColumnWriterOptions& opts,
     DCHECK(wblock != nullptr);
 }
 
-ColumnWriter::~ColumnWriter() {
+ScalarColumnWriter::~ScalarColumnWriter() {
     // delete all pages
     Page* page = _pages.head;
     while (page != nullptr) {
@@ -106,46 +180,46 @@ ColumnWriter::~ColumnWriter() {
     }
 }
 
-Status ColumnWriter::init() {
-    RETURN_IF_ERROR(EncodingInfo::get(_field->type_info(), _opts.meta->encoding(), &_encoding_info));
-    _opts.meta->set_encoding(_encoding_info->encoding());
-    // should store more concrete encoding type instead of DEFAULT_ENCODING
-    // because the default encoding of a data type can be changed in the future
-    DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
-
+Status ScalarColumnWriter::init() {
     RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
 
-    // create page builder
     PageBuilder* page_builder = nullptr;
+
+    RETURN_IF_ERROR(EncodingInfo::get(get_field()->type_info(), _opts.meta->encoding(), &_encoding_info));
+    _opts.meta->set_encoding(_encoding_info->encoding());
+    // create page builder
     PageBuilderOptions opts;
     opts.data_page_size = _opts.data_page_size;
     RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
     if (page_builder == nullptr) {
         return Status::NotSupported(
-            Substitute("Failed to create page builder for type $0 and encoding $1",
-                       _field->type(), _opts.meta->encoding()));
+                Substitute("Failed to create page builder for type $0 and encoding $1",
+                           get_field()->type(), _opts.meta->encoding()));
     }
+    // should store more concrete encoding type instead of DEFAULT_ENCODING
+    // because the default encoding of a data type can be changed in the future
+    DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
     _page_builder.reset(page_builder);
     // create ordinal builder
     _ordinal_index_builder.reset(new OrdinalIndexWriter());
     // create null bitmap builder
-    if (_is_nullable) {
+    if (is_nullable()) {
         _null_bitmap_builder.reset(new NullBitmapBuilder());
     }
     if (_opts.need_zone_map) {
-        _zone_map_index_builder.reset(new ZoneMapIndexWriter(_field.get()));
+        _zone_map_index_builder.reset(new ZoneMapIndexWriter(get_field()));
     }
     if (_opts.need_bitmap_index) {
-        RETURN_IF_ERROR(BitmapIndexWriter::create(_field->type_info(), &_bitmap_index_builder));
+        RETURN_IF_ERROR(BitmapIndexWriter::create(get_field()->type_info(), &_bitmap_index_builder));
     }
     if (_opts.need_bloom_filter) {
         RETURN_IF_ERROR(BloomFilterIndexWriter::create(BloomFilterOptions(),
-                _field->type_info(), &_bloom_filter_index_builder));
+                get_field()->type_info(), &_bloom_filter_index_builder));
     }
     return Status::OK();
 }
 
-Status ColumnWriter::append_nulls(size_t num_rows) {
+Status ScalarColumnWriter::append_nulls(size_t num_rows) {
     _null_bitmap_builder->add_run(true, num_rows);
     _next_rowid += num_rows;
     if (_opts.need_zone_map) {
@@ -160,14 +234,10 @@ Status ColumnWriter::append_nulls(size_t num_rows) {
     return Status::OK();
 }
 
-Status ColumnWriter::append(const void* data, size_t num_rows) {
-    return _append_data((const uint8_t**)&data, num_rows);
-}
-
 // append data to page builder. this function will make sure that
 // num_rows must be written before return. And ptr will be modified
 // to next data should be written
-Status ColumnWriter::_append_data(const uint8_t** ptr, size_t num_rows) {
+Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
     size_t remaining = num_rows;
     while (remaining > 0) {
         size_t num_written = remaining;
@@ -185,50 +255,26 @@ Status ColumnWriter::_append_data(const uint8_t** ptr, size_t num_rows) {
         bool is_page_full = (num_written < remaining);
         remaining -= num_written;
         _next_rowid += num_written;
-        *ptr += _field->size() * num_written;
+        *ptr += get_field()->size() * num_written;
         // we must write null bits after write data, because we don't
         // know how many rows can be written into current page
-        if (_is_nullable) {
+        if (is_nullable()) {
             _null_bitmap_builder->add_run(false, num_written);
         }
 
         if (is_page_full) {
-            RETURN_IF_ERROR(_finish_current_page());
+            RETURN_IF_ERROR(finish_current_page());
         }
     }
     return Status::OK();
 }
 
-Status ColumnWriter::append_nullable(
-        const uint8_t* is_null_bits, const void* data, size_t num_rows) {
-    const uint8_t* ptr = (const uint8_t*)data;
-    BitmapIterator null_iter(is_null_bits, num_rows);
-    bool is_null = false;
-    size_t this_run = 0;
-    while ((this_run = null_iter.Next(&is_null)) > 0) {
-        if (is_null) {
-            _null_bitmap_builder->add_run(true, this_run);
-            _next_rowid += this_run;
-            if (_opts.need_zone_map) {
-                _zone_map_index_builder->add_nulls(this_run);
-            }
-            if (_opts.need_bitmap_index) {
-                _bitmap_index_builder->add_nulls(this_run);
-            }
-            if (_opts.need_bloom_filter) {
-                _bloom_filter_index_builder->add_nulls(this_run);
-            }
-        } else {
-            RETURN_IF_ERROR(_append_data(&ptr, this_run));
-        }
-    }
-    return Status::OK();
-}
 
-uint64_t ColumnWriter::estimate_buffer_size() {
+
+uint64_t ScalarColumnWriter::estimate_buffer_size() {
     uint64_t size = _data_size;
     size += _page_builder->size();
-    if (_is_nullable) {
+    if (is_nullable()) {
         size += _null_bitmap_builder->size();
     }
     size += _ordinal_index_builder->size();
@@ -244,11 +290,13 @@ uint64_t ColumnWriter::estimate_buffer_size() {
     return size;
 }
 
-Status ColumnWriter::finish() {
-    return _finish_current_page();
+Status ScalarColumnWriter::finish() {
+    RETURN_IF_ERROR(finish_current_page());
+    _opts.meta->set_num_rows(_next_rowid);
+    return Status::OK();
 }
 
-Status ColumnWriter::write_data() {
+Status ScalarColumnWriter::write_data() {
     Page* page = _pages.head;
     while (page != nullptr) {
         RETURN_IF_ERROR(_write_data_page(page));
@@ -273,25 +321,25 @@ Status ColumnWriter::write_data() {
     return Status::OK();
 }
 
-Status ColumnWriter::write_ordinal_index() {
+Status ScalarColumnWriter::write_ordinal_index() {
     return _ordinal_index_builder->finish(_wblock, _opts.meta->add_indexes());
 }
 
-Status ColumnWriter::write_zone_map() {
+Status ScalarColumnWriter::write_zone_map() {
     if (_opts.need_zone_map) {
         return _zone_map_index_builder->finish(_wblock, _opts.meta->add_indexes());
     }
     return Status::OK();
 }
 
-Status ColumnWriter::write_bitmap_index() {
+Status ScalarColumnWriter::write_bitmap_index() {
     if (_opts.need_bitmap_index) {
         return _bitmap_index_builder->finish(_wblock, _opts.meta->add_indexes());
     }
     return Status::OK();
 }
 
-Status ColumnWriter::write_bloom_filter_index() {
+Status ScalarColumnWriter::write_bloom_filter_index() {
     if (_opts.need_bloom_filter) {
         return _bloom_filter_index_builder->finish(_wblock, _opts.meta->add_indexes());
     }
@@ -299,7 +347,7 @@ Status ColumnWriter::write_bloom_filter_index() {
 }
 
 // write a data page into file and update ordinal index
-Status ColumnWriter::_write_data_page(Page* page) {
+Status ScalarColumnWriter::_write_data_page(Page* page) {
     PagePointer pp;
     std::vector<Slice> compressed_body;
     for (auto& data : page->data) {
@@ -310,7 +358,7 @@ Status ColumnWriter::_write_data_page(Page* page) {
     return Status::OK();
 }
 
-Status ColumnWriter::_finish_current_page() {
+Status ScalarColumnWriter::finish_current_page() {
     if (_next_rowid == _first_rowid) {
         return Status::OK();
     }
@@ -330,7 +378,7 @@ Status ColumnWriter::_finish_current_page() {
     body.push_back(encoded_values.slice());
 
     OwnedSlice nullmap;
-    if (_is_nullable && _null_bitmap_builder->has_null()) {
+    if (is_nullable() && _null_bitmap_builder->has_null()) {
         nullmap = _null_bitmap_builder->finish();
         body.push_back(nullmap.slice());
     }
@@ -346,7 +394,9 @@ Status ColumnWriter::_finish_current_page() {
     data_page_footer->set_first_ordinal(_first_rowid);
     data_page_footer->set_num_values(_next_rowid - _first_rowid);
     data_page_footer->set_nullmap_size(nullmap.slice().size);
-
+    if (_new_page_callback != nullptr) {
+        _new_page_callback->put_extra_info_in_page(data_page_footer);
+    }
     // trying to compress page body
     OwnedSlice compressed_body;
     RETURN_IF_ERROR(PageIO::compress_page_body(
@@ -365,5 +415,85 @@ Status ColumnWriter::_finish_current_page() {
     return Status::OK();
 }
 
+////////////////////////////////////////////////////////////////////////////////
+
+ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts,
+                         std::unique_ptr<Field> field,
+                                     ScalarColumnWriter* offset_writer,
+                         std::unique_ptr<ColumnWriter> item_writer):
+        ColumnWriter(std::move(field), opts.meta->is_nullable()), _item_writer(std::move(item_writer)) {
+    _offset_writer.reset(offset_writer);
+}
+
+Status ArrayColumnWriter::init() {
+    RETURN_IF_ERROR(_offset_writer->init());
+    RETURN_IF_ERROR(_item_writer->init());
+    _offset_writer->register_flush_page_callback(this);
+    return Status::OK();
+}
+
+Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
+    footer->set_next_array_item_ordinal( _item_writer->get_next_rowid());
+    return Status::OK();
+}
+
+// Now we can only write data one by one.
+Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
+    size_t remaining = num_rows;
+    const auto* col_cursor = reinterpret_cast<const Collection*>(*ptr);
+    while (remaining > 0) {
+        // TODO llj: bulk write
+        size_t num_written = 1;
+        ordinal_t next_item_ordinal = _item_writer->get_next_rowid();
+        ordinal_t* next_item_ordinal_ptr = &next_item_ordinal;
+        RETURN_IF_ERROR(_offset_writer->append_data((const uint8_t**)&next_item_ordinal_ptr, num_written));
+
+        // write child item.
+        if(_item_writer->is_nullable()) {
+            auto* item_data_ptr = col_cursor->data;
+            for (size_t i = 0; i < col_cursor->length; ++i) {
+                RETURN_IF_ERROR(_item_writer->append(col_cursor->null_signs[i], item_data_ptr));
+                item_data_ptr = (uint8_t*)item_data_ptr + _item_writer->get_field()->size();
+            }
+        } else {
+            RETURN_IF_ERROR(_item_writer->append_not_nulls(col_cursor->data, col_cursor->length));
+        }
+
+        remaining -= num_written;
+        col_cursor += num_written;
+    }
+    return Status::OK();
+}
+
+uint64_t ArrayColumnWriter::estimate_buffer_size() {
+    return _offset_writer->estimate_buffer_size() + _item_writer->estimate_buffer_size();
+}
+
+Status ArrayColumnWriter::finish() {
+    RETURN_IF_ERROR(_offset_writer->finish());
+    RETURN_IF_ERROR(_item_writer->finish());
+    return Status::OK();
 }
+
+Status ArrayColumnWriter::write_data() {
+    RETURN_IF_ERROR(_offset_writer->write_data());
+    RETURN_IF_ERROR(_item_writer->write_data());
+    return Status::OK();
+}
+
+Status ArrayColumnWriter::write_ordinal_index() {
+    RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
+    RETURN_IF_ERROR(_item_writer->write_ordinal_index());
+    return Status::OK();
+}
+
+Status ArrayColumnWriter::append_nulls(size_t num_rows) {
+    return _offset_writer->append_nulls(num_rows);
+}
+
+Status ArrayColumnWriter::finish_current_page() {
+    return _offset_writer->finish_current_page();
+}
+
+} // namespace segment_v2 end
 }
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index 8aec7f5..a687a96 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -23,6 +23,7 @@
 #include "gen_cpp/segment_v2.pb.h" // for EncodingTypePB
 #include "olap/rowset/segment_v2/common.h"
 #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
+#include "olap/tablet_schema.h" // for TabletColumn
 #include "util/bitmap.h" // for BitmapChange
 #include "util/slice.h" // for OwnedSlice
 
@@ -59,18 +60,19 @@ class PageBuilder;
 class BloomFilterIndexWriter;
 class ZoneMapIndexWriter;
 
-// Encode one column's data into some memory slice.
-// Because some columns would be stored in a file, we should wait
-// until all columns has been finished, and then data can be written
-// to file
 class ColumnWriter {
 public:
-    ColumnWriter(const ColumnWriterOptions& opts,
-                 std::unique_ptr<Field> field,
-                 fs::WritableBlock* output_file);
-    ~ColumnWriter();
+    static Status create(const ColumnWriterOptions& opts,
+                         const TabletColumn* column,
+                         fs::WritableBlock* _wblock,
+                         std::unique_ptr<ColumnWriter>* writer);
+
+    explicit ColumnWriter(std::unique_ptr<Field> field, bool is_nullable)
+    : _field(std::move(field)), _is_nullable(is_nullable) {}
 
-    Status init();
+    virtual ~ColumnWriter() = default;
+
+    virtual Status init() = 0;
 
     template<typename CellType>
     Status append(const CellType& cell) {
@@ -79,7 +81,7 @@ public:
             BitmapChange(&nullmap, 0, cell.is_null());
             return append_nullable(&nullmap, cell.cell_ptr(), 1);
         } else {
-            return append(cell.cell_ptr(), 1);
+            return append_not_nulls(cell.cell_ptr(), 1);
         }
     }
 
@@ -91,23 +93,95 @@ public:
         return append_nullable(&nullmap, data, 1);
     }
 
-    Status append_nulls(size_t num_rows);
-    Status append(const void* data, size_t num_rows);
     Status append_nullable(const uint8_t* nullmap, const void* data, size_t num_rows);
 
-    uint64_t estimate_buffer_size();
+    Status append_not_nulls(const void* data, size_t num_rows);
+
+    virtual Status append_nulls(size_t num_rows) = 0;
+
+    virtual Status finish_current_page() = 0;
+
+    virtual uint64_t estimate_buffer_size() = 0;
 
     // finish append data
-    Status finish();
+    virtual Status finish() = 0;
 
     // write all data into file
-    Status write_data();
-    Status write_ordinal_index();
-    Status write_zone_map();
-    Status write_bitmap_index();
-    Status write_bloom_filter_index();
+    virtual Status write_data() = 0;
+
+    virtual Status write_ordinal_index() = 0;
+
+    virtual Status write_zone_map() = 0;
+
+    virtual Status write_bitmap_index() = 0;
+
+    virtual Status write_bloom_filter_index() = 0;
+
+    virtual ordinal_t get_next_rowid() const = 0;
+
+    // used for append not null data.
+    virtual Status append_data(const uint8_t** ptr, size_t num_rows) = 0;
+
+    bool is_nullable() const { return _is_nullable; }
+
+    Field* get_field() const { return _field.get(); }
 
 private:
+    std::unique_ptr<Field> _field;
+    bool _is_nullable;
+};
+
+class FlushPageCallback {
+public:
+    virtual Status put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); }
+};
+
+// Encode one column's data into some memory slice.
+// Because some columns would be stored in a file, we should wait
+// until all columns has been finished, and then data can be written
+// to file
+class ScalarColumnWriter final : public ColumnWriter {
+public:
+    ScalarColumnWriter(const ColumnWriterOptions& opts,
+                           std::unique_ptr<Field> field,
+                           fs::WritableBlock* output_file);
+
+    ~ScalarColumnWriter() override;
+
+    Status init() override;
+
+    inline Status append_nulls(size_t num_rows) override ;
+
+    Status finish_current_page() override;
+
+    uint64_t estimate_buffer_size() override;
+
+    // finish append data
+    Status finish() override;
+
+    Status write_data() override;
+    Status write_ordinal_index() override;
+    Status write_zone_map() override;
+    Status write_bitmap_index() override;
+    Status write_bloom_filter_index() override;
+    ordinal_t get_next_rowid() const override { return _next_rowid; }
+
+    void register_flush_page_callback(FlushPageCallback* flush_page_callback) {
+        _new_page_callback = flush_page_callback;
+    }
+    Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+private:
+    std::unique_ptr<PageBuilder> _page_builder;
+
+    std::unique_ptr<NullBitmapBuilder> _null_bitmap_builder;
+
+    ColumnWriterOptions _opts;
+
+    const EncodingInfo* _encoding_info = nullptr;
+
+    ordinal_t _next_rowid = 0;
+
     // All Pages will be organized into a linked list
     struct Page {
         // the data vector may contain:
@@ -141,35 +215,64 @@ private:
         _data_size += 20;
     }
 
-    Status _append_data(const uint8_t** ptr, size_t num_rows);
-    Status _finish_current_page();
     Status _write_data_page(Page* page);
 
 private:
-    ColumnWriterOptions _opts;
-    std::unique_ptr<Field> _field;
     fs::WritableBlock* _wblock = nullptr;
-    bool _is_nullable;
     // total size of data page list
     uint64_t _data_size;
 
     // cached generated pages,
     PageHead _pages;
     ordinal_t _first_rowid = 0;
-    ordinal_t _next_rowid = 0;
 
-    const EncodingInfo* _encoding_info = nullptr;
     const BlockCompressionCodec* _compress_codec = nullptr;
 
-    std::unique_ptr<PageBuilder> _page_builder;
-    std::unique_ptr<NullBitmapBuilder> _null_bitmap_builder;
-
     std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
     std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
     std::unique_ptr<BitmapIndexWriter> _bitmap_index_builder;
     std::unique_ptr<BloomFilterIndexWriter> _bloom_filter_index_builder;
+
+    // call before flush data page.
+    FlushPageCallback* _new_page_callback = nullptr;
 };
 
+class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback {
+public:
+    explicit ArrayColumnWriter(const ColumnWriterOptions& opts,
+                      std::unique_ptr<Field> field,
+                               ScalarColumnWriter* offset_writer,
+                      std::unique_ptr<ColumnWriter> item_writer);
+    ~ArrayColumnWriter() override = default;
 
-}
-}
+    Status init() override ;
+
+    Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+    uint64_t estimate_buffer_size() override ;
+
+    Status finish() override ;
+    Status write_data() override ;
+    Status write_ordinal_index() override ;
+    inline Status append_nulls(size_t num_rows) override ;
+
+    Status finish_current_page() override;
+
+    Status write_zone_map() override { return Status::OK(); }
+
+    Status write_bitmap_index() override { return Status::OK(); }
+
+    Status write_bloom_filter_index() override { return Status::OK(); }
+
+    ordinal_t get_next_rowid() const override { return _offset_writer->get_next_rowid(); }
+
+private:
+    Status put_extra_info_in_page(DataPageFooterPB* header) override ;
+
+private:
+    std::unique_ptr<ScalarColumnWriter> _offset_writer;
+    std::unique_ptr<ColumnWriter> _item_writer;
+};
+
+}  // namespace segment_v2
+}  // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp
index 5993d15..b37bcf4 100644
--- a/be/src/olap/rowset/segment_v2/encoding_info.cpp
+++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp
@@ -81,6 +81,18 @@ struct TypeEncodingTraits<type, BIT_SHUFFLE, CppType,
 };
 
 template<>
+struct TypeEncodingTraits<OLAP_FIELD_TYPE_ARRAY, BIT_SHUFFLE, Collection> {
+    static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) {
+        *builder = new BitshufflePageBuilder<OLAP_FIELD_TYPE_UNSIGNED_BIGINT>(opts);
+        return Status::OK();
+    }
+    static Status create_page_decoder(const Slice& data, const PageDecoderOptions& opts, PageDecoder** decoder) {
+        *decoder = new BitShufflePageDecoder<OLAP_FIELD_TYPE_UNSIGNED_BIGINT>(data, opts);
+        return Status::OK();
+    }
+};
+
+template<>
 struct TypeEncodingTraits<OLAP_FIELD_TYPE_BOOL, RLE, bool> {
     static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) {
         *builder = new RlePageBuilder<OLAP_FIELD_TYPE_BOOL>(opts);
@@ -211,6 +223,9 @@ EncodingInfoResolver::EncodingInfoResolver() {
     _add_map<OLAP_FIELD_TYPE_BIGINT, FOR_ENCODING, true>();
     _add_map<OLAP_FIELD_TYPE_BIGINT, PLAIN_ENCODING>();
 
+    _add_map<OLAP_FIELD_TYPE_UNSIGNED_BIGINT, BIT_SHUFFLE>();
+    _add_map<OLAP_FIELD_TYPE_ARRAY, BIT_SHUFFLE>();
+
     _add_map<OLAP_FIELD_TYPE_LARGEINT, BIT_SHUFFLE>();
     _add_map<OLAP_FIELD_TYPE_LARGEINT, PLAIN_ENCODING>();
     _add_map<OLAP_FIELD_TYPE_LARGEINT, FOR_ENCODING, true>();
diff --git a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h
index 0c0b5d7..cb13717 100644
--- a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h
+++ b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h
@@ -152,6 +152,11 @@ public:
     }
 
     Status next_batch(size_t* n, ColumnBlockView* dst) override {
+        return next_batch<true>(n, dst);
+    }
+
+    template<bool forward_index>
+    inline Status next_batch(size_t* n, ColumnBlockView* dst) {
         DCHECK(_parsed) << "Must call init() firstly";
         if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) {
             *n = 0;
@@ -161,11 +166,17 @@ public:
         size_t to_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index));
         uint8_t* data_ptr = dst->data();
         _decoder->get_batch(reinterpret_cast<CppType*>(data_ptr), to_fetch);
-        _cur_index += to_fetch;
+        if (forward_index) {
+            _cur_index += to_fetch;
+        }
         *n = to_fetch;
         return Status::OK();
     }
 
+    Status peek_next_batch(size_t *n, ColumnBlockView* dst) override {
+        return next_batch<false>(n, dst);
+    }
+
     size_t count() const override {
         return _num_elements;
     }
diff --git a/be/src/olap/rowset/segment_v2/page_decoder.h b/be/src/olap/rowset/segment_v2/page_decoder.h
index ee7e84b..3bce749 100644
--- a/be/src/olap/rowset/segment_v2/page_decoder.h
+++ b/be/src/olap/rowset/segment_v2/page_decoder.h
@@ -81,6 +81,13 @@ public:
     // allocated in the column_vector_view's mem_pool.
     virtual Status next_batch(size_t* n, ColumnBlockView* dst) = 0;
 
+    // Same as `next_batch` except for not moving forward the cursor.
+    // When read array's ordinals in `ArrayFileColumnIterator`, we want to read one extra ordinal
+    // but do not want to move forward the cursor.
+    virtual Status peek_next_batch(size_t *n, ColumnBlockView* dst) {
+        return Status::NotSupported("peek_next_batch");
+    }
+
     // Return the number of elements in this page.
     virtual size_t count() const = 0;
 
@@ -88,6 +95,8 @@ public:
     // entry (ie the entry that will next be returned by next_vector())
     virtual size_t current_index() const = 0;
 
+    bool has_remaining() const { return current_index() < count(); }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(PageDecoder);
 };
diff --git a/be/src/olap/rowset/segment_v2/parsed_page.h b/be/src/olap/rowset/segment_v2/parsed_page.h
index 7291ede..03e9eee 100644
--- a/be/src/olap/rowset/segment_v2/parsed_page.h
+++ b/be/src/olap/rowset/segment_v2/parsed_page.h
@@ -62,9 +62,12 @@ struct ParsedPage {
 
         page->first_ordinal = footer.first_ordinal();
         page->num_rows = footer.num_values();
+
         page->page_pointer = page_pointer;
         page->page_index = page_index;
 
+        page->next_array_item_ordinal = footer.next_array_item_ordinal();
+
         *result = std::move(page);
         return Status::OK();
     }
@@ -84,6 +87,8 @@ struct ParsedPage {
     ordinal_t first_ordinal = 0;
     // number of rows including nulls and not-nulls
     ordinal_t num_rows = 0;
+    // just for array type
+    ordinal_t next_array_item_ordinal = 0;
 
     PagePointer page_pointer;
     uint32_t page_index = 0;
diff --git a/be/src/olap/rowset/segment_v2/plain_page.h b/be/src/olap/rowset/segment_v2/plain_page.h
index 6908b09..e9d70fb 100644
--- a/be/src/olap/rowset/segment_v2/plain_page.h
+++ b/be/src/olap/rowset/segment_v2/plain_page.h
@@ -195,7 +195,12 @@ public:
         return Status::OK();
     }
 
-    Status next_batch(size_t *n, ColumnBlockView *dst) override {
+    Status next_batch(size_t* n, ColumnBlockView* dst) override {
+        return next_batch<true>(n, dst);
+    }
+
+    template<bool forward_index>
+    inline Status next_batch(size_t *n, ColumnBlockView *dst) {
         DCHECK(_parsed);
 
         if (PREDICT_FALSE(*n == 0 || _cur_idx >= _num_elems)) {
@@ -207,11 +212,17 @@ public:
         memcpy(dst->data(),
                &_data[PLAIN_PAGE_HEADER_SIZE + _cur_idx * SIZE_OF_TYPE],
                max_fetch * SIZE_OF_TYPE);
-        _cur_idx += max_fetch;
+        if (forward_index) {
+            _cur_idx += max_fetch;
+        }
         *n = max_fetch;
         return Status::OK();
     }
 
+    Status peek_next_batch(size_t *n, ColumnBlockView* dst) override {
+        return next_batch<false>(n, dst);
+    }
+
     size_t count() const override {
         DCHECK(_parsed);
         return _num_elems;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index 2ad6b3a..73fb8ae 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -189,11 +189,12 @@ Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
         if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
             return Status::InternalError("invalid nonexistent column without default value.");
         }
+        TypeInfo* type_info = get_type_info(&tablet_column);
         std::unique_ptr<DefaultValueColumnIterator> default_value_iter(
                 new DefaultValueColumnIterator(tablet_column.has_default_value(),
                 tablet_column.default_value(),
                 tablet_column.is_nullable(),
-                tablet_column.type(),
+                type_info,
                 tablet_column.length()));
         ColumnIteratorOptions iter_opts;
         RETURN_IF_ERROR(default_value_iter->init(iter_opts));
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index a977407..8fb0ddc 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -48,31 +48,50 @@ SegmentWriter::SegmentWriter(fs::WritableBlock* wblock,
 
 SegmentWriter::~SegmentWriter() = default;
 
+void SegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, const TabletColumn& column) {
+    // TODO(zc): Do we need this column_id??
+    meta->set_column_id((*column_id)++);
+    meta->set_unique_id(column.unique_id());
+    meta->set_type(column.type());
+    meta->set_length(column.length());
+    meta->set_encoding(DEFAULT_ENCODING);
+    meta->set_compression(LZ4F);
+    meta->set_is_nullable(column.is_nullable());
+    if (column.get_subtype_count() > 0) {
+        for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
+            _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i));
+        }
+    }
+}
+
 Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused))) {
     uint32_t column_id = 0;
     _column_writers.reserve(_tablet_schema->columns().size());
     for (auto& column : _tablet_schema->columns()) {
-        std::unique_ptr<Field> field(FieldFactory::create(column));
-        DCHECK(field.get() != nullptr);
-
         ColumnWriterOptions opts;
         opts.meta = _footer.add_columns();
-        // TODO(zc): Do we need this column_id??
-        opts.meta->set_column_id(column_id++);
-        opts.meta->set_unique_id(column.unique_id());
-        opts.meta->set_type(field->type());
-        opts.meta->set_length(column.length());
-        opts.meta->set_encoding(DEFAULT_ENCODING);
-        opts.meta->set_compression(LZ4F);
-        opts.meta->set_is_nullable(column.is_nullable());
+
+        _init_column_meta(opts.meta, &column_id, column);
 
         // now we create zone map for key columns
+        // and not support zone map for array type.
         opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() == KeysType::DUP_KEYS;
+        if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+            opts.need_zone_map = false;
+        }
         opts.need_bloom_filter = column.is_bf_column();
         opts.need_bitmap_index = column.has_bitmap_index();
-
-        std::unique_ptr<ColumnWriter> writer(
-                new ColumnWriter(opts, std::move(field), _wblock));
+        if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+            if (opts.need_bloom_filter) {
+                return Status::NotSupported("Do not support bloom filter for array type" );
+            }
+            if (opts.need_bitmap_index) {
+                return Status::NotSupported("Do not support bitmap index for array type" );
+            }
+        }
+
+        std::unique_ptr<ColumnWriter> writer;
+        RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _wblock, &writer));
         RETURN_IF_ERROR(writer->init());
         _column_writers.push_back(std::move(writer));
     }
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index 4703f68..3acf625 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -31,6 +31,7 @@ namespace doris {
 class RowBlock;
 class RowCursor;
 class TabletSchema;
+class TabletColumn;
 class ShortKeyIndexBuilder;
 
 namespace fs {
@@ -77,6 +78,7 @@ private:
     Status _write_short_key_index();
     Status _write_footer();
     Status _write_raw_data(const std::vector<Slice>& slices);
+    void _init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, const TabletColumn& column);
 
 private:
     uint32_t _segment_id;
diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp
index 83cd124..5613e2c 100644
--- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp
+++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp
@@ -121,10 +121,14 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
 
     // read and cache all page zone maps
     for (int i = 0; i < reader.num_values(); ++i) {
-        Slice value;
-        uint8_t nullmap;
         size_t num_to_read = 1;
-        ColumnBlock block(reader.type_info(), (uint8_t*) &value, &nullmap, num_to_read, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read,
+                                  false,
+                                  reader.type_info(),
+                                  nullptr,
+                                  &cvb));
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         RETURN_IF_ERROR(iter.seek_to_ordinal(i));
@@ -132,7 +136,8 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
         RETURN_IF_ERROR(iter.next_batch(&num_read, &column_block_view));
         DCHECK(num_to_read == num_read);
 
-        if (!_page_zone_maps[i].ParseFromArray(value.data, value.size)) {
+        Slice* value = reinterpret_cast<Slice*>(cvb->data());
+        if (!_page_zone_maps[i].ParseFromArray(value->data, value->size)) {
             return Status::Corruption("Failed to parse zone map");
         }
         pool.clear();
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 29ef956..7b5719d 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <vector>
-
 #include "olap/tablet_schema.h"
 #include "tablet_meta.h"
 
@@ -68,7 +66,7 @@ FieldType TabletColumn::get_field_type_by_string(const std::string& type_str) {
     } else if (0 == upper_type_str.compare("STRUCT")) {
         type = OLAP_FIELD_TYPE_STRUCT;
     } else if (0 == upper_type_str.compare("LIST")) {
-        type = OLAP_FIELD_TYPE_LIST;
+        type = OLAP_FIELD_TYPE_ARRAY;
     } else if (0 == upper_type_str.compare("MAP")) {
         type = OLAP_FIELD_TYPE_MAP;
     } else if (0 == upper_type_str.compare("OBJECT")) {
@@ -172,7 +170,7 @@ std::string TabletColumn::get_string_by_field_type(FieldType type) {
         case OLAP_FIELD_TYPE_STRUCT:
             return "STRUCT";
 
-        case OLAP_FIELD_TYPE_LIST:
+        case OLAP_FIELD_TYPE_ARRAY:
             return "LIST";
 
         case OLAP_FIELD_TYPE_MAP:
@@ -269,6 +267,17 @@ TabletColumn::TabletColumn(FieldAggregationMethod agg, FieldType filed_type, boo
     _is_nullable = is_nullable;
 }
 
+TabletColumn::TabletColumn(FieldAggregationMethod agg,
+                           FieldType filed_type,
+                           bool is_nullable,
+                           int32_t unique_id,
+                           size_t length) {
+    _aggregation = agg;
+    _type = filed_type;
+    _is_nullable = is_nullable;
+    _unique_id = unique_id;
+    _length = length;
+}
 void TabletColumn::init_from_pb(const ColumnPB& column) {
     _unique_id = column.unique_id();
     _col_name = column.name();
@@ -312,6 +321,12 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
     if (column.has_visible()) {
         _visible = column.visible();
     }
+    if (_type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+        DCHECK(column.children_columns_size() == 1) << "LIST type has more than 1 children types.";
+        TabletColumn child_column;
+        child_column.init_from_pb(column.children_columns(0));
+        add_sub_column(child_column);
+    }
 }
 
 void TabletColumn::to_schema_pb(ColumnPB* column) {
@@ -342,6 +357,12 @@ void TabletColumn::to_schema_pb(ColumnPB* column) {
     column->set_visible(_visible);
 }
 
+void TabletColumn::add_sub_column(TabletColumn& sub_column) {
+    _sub_columns.push_back(sub_column);
+    sub_column._parent = this;
+    _sub_column_count += 1;
+}
+
 void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
     _keys_type = schema.keys_type();
     _num_columns = 0;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 480f0a5..b38d688 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -31,6 +31,7 @@ public:
     TabletColumn();
     TabletColumn(FieldAggregationMethod agg, FieldType type);
     TabletColumn(FieldAggregationMethod agg, FieldType filed_type, bool is_nullable);
+    TabletColumn(FieldAggregationMethod agg, FieldType filed_type, bool is_nullable, int32_t unique_id, size_t length);
     void init_from_pb(const ColumnPB& column);
     void to_schema_pb(ColumnPB* column);
 
@@ -52,6 +53,17 @@ public:
     int precision() const { return _precision; }
     int frac() const { return _frac; }
     inline bool visible() { return _visible; }
+    /**
+     * Add a sub column.
+     */
+    void add_sub_column(TabletColumn& sub_column);
+
+    uint32_t get_subtype_count() const {
+        return _sub_column_count;
+    }
+    const TabletColumn& get_sub_column(uint32_t i) const {
+        return _sub_columns[i];
+    }
 
     friend bool operator==(const TabletColumn& a, const TabletColumn& b);
     friend bool operator!=(const TabletColumn& a, const TabletColumn& b);
@@ -88,6 +100,10 @@ private:
 
     bool _has_bitmap_index = false;
     bool _visible = true;
+
+    TabletColumn* _parent = nullptr;
+    std::vector<TabletColumn> _sub_columns;
+    uint32_t _sub_column_count = 0;
 };
 
 bool operator==(const TabletColumn& a, const TabletColumn& b);
diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp
index 182e247..4f90b81 100644
--- a/be/src/olap/types.cpp
+++ b/be/src/olap/types.cpp
@@ -22,7 +22,7 @@ namespace doris {
 void (*FieldTypeTraits<OLAP_FIELD_TYPE_CHAR>::set_to_max)(void*) = nullptr;
 
 template<typename TypeTraitsClass>
-TypeInfo::TypeInfo(TypeTraitsClass t)
+ScalarTypeInfo::ScalarTypeInfo(TypeTraitsClass t)
       : _equal(TypeTraitsClass::equal),
         _cmp(TypeTraitsClass::cmp),
         _shallow_copy(TypeTraitsClass::shallow_copy),
@@ -39,36 +39,37 @@ TypeInfo::TypeInfo(TypeTraitsClass t)
         _field_type(TypeTraitsClass::type) {
 }
 
-class TypeInfoResolver {
-    DECLARE_SINGLETON(TypeInfoResolver);
+class ScalarTypeInfoResolver {
+    DECLARE_SINGLETON(ScalarTypeInfoResolver);
 public:
     TypeInfo* get_type_info(const FieldType t) {
-        auto pair = _mapping.find(t);
-        DCHECK(pair != _mapping.end()) << "Bad field type: " << t;
+        auto pair = _scalar_type_mapping.find(t);
+        DCHECK(pair != _scalar_type_mapping.end()) << "Bad field type: " << t;
         return pair->second.get();
     }
 
 private:
     template<FieldType field_type> void add_mapping() {
         TypeTraits<field_type> traits;
-        _mapping.emplace(field_type,
-                 std::shared_ptr<TypeInfo>(new TypeInfo(traits)));
+        _scalar_type_mapping.emplace(field_type,
+                 std::shared_ptr<TypeInfo>(new ScalarTypeInfo(traits)));
     }
 
     std::unordered_map<FieldType,
         std::shared_ptr<TypeInfo>,
-        std::hash<size_t>> _mapping;
+        std::hash<size_t>> _scalar_type_mapping;
 
-    DISALLOW_COPY_AND_ASSIGN(TypeInfoResolver);
+    DISALLOW_COPY_AND_ASSIGN(ScalarTypeInfoResolver);
 };
 
-TypeInfoResolver::TypeInfoResolver() {
+ScalarTypeInfoResolver::ScalarTypeInfoResolver() {
     add_mapping<OLAP_FIELD_TYPE_TINYINT>();
     add_mapping<OLAP_FIELD_TYPE_SMALLINT>();
     add_mapping<OLAP_FIELD_TYPE_INT>();
     add_mapping<OLAP_FIELD_TYPE_UNSIGNED_INT>();
     add_mapping<OLAP_FIELD_TYPE_BOOL>();
     add_mapping<OLAP_FIELD_TYPE_BIGINT>();
+    add_mapping<OLAP_FIELD_TYPE_UNSIGNED_BIGINT>();
     add_mapping<OLAP_FIELD_TYPE_LARGEINT>();
     add_mapping<OLAP_FIELD_TYPE_FLOAT>();
     add_mapping<OLAP_FIELD_TYPE_DOUBLE>();
@@ -81,10 +82,98 @@ TypeInfoResolver::TypeInfoResolver() {
     add_mapping<OLAP_FIELD_TYPE_OBJECT>();
 }
 
-TypeInfoResolver::~TypeInfoResolver() {}
+ScalarTypeInfoResolver::~ScalarTypeInfoResolver() {}
 
+bool is_scalar_type(FieldType field_type) {
+    switch (field_type) {
+    case OLAP_FIELD_TYPE_STRUCT:
+    case OLAP_FIELD_TYPE_ARRAY:
+    case OLAP_FIELD_TYPE_MAP:
+        return false;
+    default: return true;
+    }
+}
+
+TypeInfo* get_scalar_type_info(FieldType field_type) {
+    return ScalarTypeInfoResolver::instance()->get_type_info(field_type);
+}
+
+
+class ArrayTypeInfoResolver {
+    DECLARE_SINGLETON(ArrayTypeInfoResolver);
+
+public:
+    TypeInfo* get_type_info(const FieldType t) {
+        auto pair = _type_mapping.find(t);
+        DCHECK(pair != _type_mapping.end()) << "Bad field type: list<" << t << ">";
+        return pair->second.get();
+    }
+private:
+    template<FieldType item_type> void add_mapping() {
+        _type_mapping.emplace(item_type, std::shared_ptr<TypeInfo>(
+                new ArrayTypeInfo(get_scalar_type_info(item_type)))
+                );
+    }
+
+    // item_type_info -> list_type_info
+    std::unordered_map<FieldType, std::shared_ptr<TypeInfo>, std::hash<size_t>> _type_mapping;
+};
+
+ArrayTypeInfoResolver::~ArrayTypeInfoResolver() = default;
+
+ArrayTypeInfoResolver::ArrayTypeInfoResolver() {
+    add_mapping<OLAP_FIELD_TYPE_TINYINT>();
+    add_mapping<OLAP_FIELD_TYPE_SMALLINT>();
+    add_mapping<OLAP_FIELD_TYPE_INT>();
+    add_mapping<OLAP_FIELD_TYPE_UNSIGNED_INT>();
+    add_mapping<OLAP_FIELD_TYPE_BOOL>();
+    add_mapping<OLAP_FIELD_TYPE_BIGINT>();
+    add_mapping<OLAP_FIELD_TYPE_LARGEINT>();
+    add_mapping<OLAP_FIELD_TYPE_FLOAT>();
+    add_mapping<OLAP_FIELD_TYPE_DOUBLE>();
+    add_mapping<OLAP_FIELD_TYPE_DECIMAL>();
+    add_mapping<OLAP_FIELD_TYPE_DATE>();
+    add_mapping<OLAP_FIELD_TYPE_DATETIME>();
+    add_mapping<OLAP_FIELD_TYPE_CHAR>();
+    add_mapping<OLAP_FIELD_TYPE_VARCHAR>();
+}
+
+// equal to get_scalar_type_info
 TypeInfo* get_type_info(FieldType field_type) {
-    return TypeInfoResolver::instance()->get_type_info(field_type);
+    return get_scalar_type_info(field_type);
+}
+
+TypeInfo* get_type_info(segment_v2::ColumnMetaPB* column_meta_pb) {
+    FieldType type = (FieldType)column_meta_pb->type();
+    if (is_scalar_type(type)) {
+        return get_scalar_type_info(type);
+    } else {
+        switch (type) {
+            case OLAP_FIELD_TYPE_ARRAY: {
+                DCHECK(column_meta_pb->children_columns_size() == 1) << "more than 1 child type.";
+                FieldType child_type = (FieldType)column_meta_pb->children_columns(0).type();
+                return ArrayTypeInfoResolver::instance()->get_type_info(child_type);
+            }
+            default:
+                DCHECK(false) << "Bad field type: " << type;
+                return nullptr;
+        }
+    }
+}
+
+TypeInfo* get_type_info(const TabletColumn* col) {
+    if (is_scalar_type(col->type())) {
+        return get_scalar_type_info(col->type());
+    } else {
+        switch (col->type()) {
+        case OLAP_FIELD_TYPE_ARRAY:
+            DCHECK(col->get_subtype_count() == 1) << "more than 1 child type.";
+            return ArrayTypeInfoResolver::instance()->get_type_info(col->get_sub_column(0).type());
+        default:
+            DCHECK(false) << "Bad field type: " << col->type();
+            return nullptr;
+        }
+    }
 }
 
 } // namespace doris
diff --git a/be/src/olap/types.h b/be/src/olap/types.h
index 0067092..1a44983 100644
--- a/be/src/olap/types.h
+++ b/be/src/olap/types.h
@@ -25,10 +25,13 @@
 #include <sstream>
 #include <string>
 
+#include "gen_cpp/segment_v2.pb.h" // for ColumnMetaPB
+#include "olap/collection.h"
+#include "olap/decimal12.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/tablet_schema.h" // for TabletColumn
 #include "olap/uint24.h"
-#include "olap/decimal12.h"
 #include "runtime/mem_pool.h"
 #include "runtime/datetime_value.h"
 #include "util/hash_util.hpp"
@@ -38,53 +41,85 @@
 #include "util/string_parser.hpp"
 
 namespace doris {
+class TabletColumn;
 
 class TypeInfo {
 public:
-    inline bool equal(const void* left, const void* right) const {
+    virtual bool equal(const void* left, const void* right) const = 0;
+    virtual int cmp(const void* left, const void* right) const = 0;
+
+    virtual void shallow_copy(void* dest, const void* src) const = 0;
+
+    virtual void deep_copy(void* dest, const void* src, MemPool* mem_pool) const = 0;
+
+    // See copy_row_in_memtable() in olap/row.h, will be removed in future.
+    // It is same with deep_copy() for all type except for HLL and OBJECT type
+    virtual void copy_object(void* dest, const void* src, MemPool* mem_pool) const = 0;
+
+    virtual void direct_copy(void* dest, const void* src) const = 0;
+
+    //convert and deep copy value from other type's source
+    virtual OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type, MemPool* mem_pool) const = 0;
+
+    virtual OLAPStatus from_string(void* buf, const std::string& scan_key) const = 0;
+
+    virtual std::string to_string(const void* src) const = 0;
+
+    virtual void set_to_max(void* buf) const = 0;
+    virtual void set_to_min(void* buf) const = 0;
+
+    virtual uint32_t hash_code(const void* data, uint32_t seed) const = 0;
+    virtual const size_t size() const = 0;
+
+    virtual FieldType type() const = 0;
+};
+
+class ScalarTypeInfo : public TypeInfo {
+public:
+    inline bool equal(const void* left, const void* right) const override {
         return _equal(left, right);
     }
 
-    inline int cmp(const void* left, const void* right) const {
+    inline int cmp(const void* left, const void* right) const override {
         return _cmp(left, right);
     }
 
-    inline void shallow_copy(void* dest, const void* src) const {
+    inline void shallow_copy(void* dest, const void* src) const override {
         _shallow_copy(dest, src);
     }
 
-    inline void deep_copy(void* dest, const void* src, MemPool* mem_pool) const {
+    inline void deep_copy(void* dest, const void* src, MemPool* mem_pool) const override {
         _deep_copy(dest, src, mem_pool);
     }
 
     // See copy_row_in_memtable() in olap/row.h, will be removed in future.
     // It is same with deep_copy() for all type except for HLL and OBJECT type
-    inline void copy_object(void* dest, const void* src, MemPool* mem_pool) const {
+    inline void copy_object(void* dest, const void* src, MemPool* mem_pool) const override {
         _copy_object(dest, src, mem_pool);
     }
 
-    inline void direct_copy(void* dest, const void* src) const {
+    inline void direct_copy(void* dest, const void* src) const override {
         _direct_copy(dest, src);
     }
 
     //convert and deep copy value from other type's source
-    OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type, MemPool* mem_pool) const{
+    OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type, MemPool* mem_pool) const override {
         return _convert_from(dest, src, src_type, mem_pool);
     }
 
-    OLAPStatus from_string(void* buf, const std::string& scan_key) const {
+    OLAPStatus from_string(void* buf, const std::string& scan_key) const override {
         return _from_string(buf, scan_key);
     }
 
-    std::string to_string(const void* src) const { return _to_string(src); }
+    std::string to_string(const void* src) const override { return _to_string(src); }
 
-    inline void set_to_max(void* buf) const { _set_to_max(buf); }
-    inline void set_to_min(void* buf) const { _set_to_min(buf); }
+    inline void set_to_max(void* buf) const override { _set_to_max(buf); }
+    inline void set_to_min(void* buf) const override { _set_to_min(buf); }
 
-    inline uint32_t hash_code(const void* data, uint32_t seed) const { return _hash_code(data, seed); }
-    inline const size_t size() const { return _size; }
+    inline uint32_t hash_code(const void* data, uint32_t seed) const override { return _hash_code(data, seed); }
+    inline const size_t size() const override { return _size; }
 
-    inline FieldType type() const { return _field_type; }
+    inline FieldType type() const override { return _field_type; }
 private:
     bool (*_equal)(const void* left, const void* right);
     int (*_cmp)(const void* left, const void* right);
@@ -106,12 +141,210 @@ private:
     const size_t _size;
     const FieldType _field_type;
 
-    friend class TypeInfoResolver;
-    template<typename TypeTraitsClass> TypeInfo(TypeTraitsClass t);
+    friend class ScalarTypeInfoResolver;
+    template<typename TypeTraitsClass> ScalarTypeInfo(TypeTraitsClass t);
 };
 
+class ArrayTypeInfo : public TypeInfo {
+public:
+    explicit ArrayTypeInfo(TypeInfo* item_type_info)
+    : _item_type_info(item_type_info), _item_size(item_type_info->size()) {
+    }
+
+    inline bool equal(const void* left, const void* right) const override {
+        auto l_value = reinterpret_cast<const Collection*>(left);
+        auto r_value = reinterpret_cast<const Collection*>(right);
+        if (l_value->length != r_value->length) {
+            return false;
+        }
+        size_t len = l_value->length;
+
+        if (!l_value->has_null && !r_value->has_null) {
+            for (size_t i = 0; i < len; ++i){
+                if (!_item_type_info->equal((uint8_t*)(l_value->data) + i * _item_size,
+                                            (uint8_t*)(r_value->data) + i * _item_size)) {
+                    return false;
+                }
+            }
+        } else {
+            for (size_t i = 0; i < len; ++i) {
+                if (l_value->null_signs[i]) {
+                    if (r_value->null_signs[i]) { // both are null
+                        continue;
+                    } else { // left is null & right is not null
+                        return false;
+                    }
+                } else if (r_value->null_signs[i]) { // left is not null & right is null
+                    return false;
+                }
+                if (!_item_type_info->equal((uint8_t*)(l_value->data) + i * _item_size,
+                                            (uint8_t*)(r_value->data) + i * _item_size)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    inline int cmp(const void* left, const void* right) const override {
+        auto l_value = reinterpret_cast<const Collection*>(left);
+        auto r_value = reinterpret_cast<const Collection*>(right);
+        size_t l_length = l_value->length;
+        size_t r_length = r_value->length;
+        size_t cur = 0;
+
+        if (!l_value->has_null && !r_value->has_null) {
+            while (cur < l_length && cur < r_length) {
+                int result = _item_type_info->cmp((uint8_t*)(l_value->data) + cur * _item_size,
+                                                      (uint8_t*)(r_value->data) + cur * _item_size);
+                if (result != 0) {
+                    return result;
+                }
+                ++cur;
+            }
+        } else {
+            while (cur < l_length && cur < r_length) {
+                if (l_value->null_signs[cur]) {
+                    if (!r_value->null_signs[cur]) { // left is null & right is not null
+                        return -1;
+                    }
+                } else if (r_value->null_signs[cur]) { // left is not null & right is null
+                    return 1;
+                } else { // both are not null
+                    int result = _item_type_info->cmp((uint8_t*)(l_value->data) + cur * _item_size,
+                                                      (uint8_t*)(r_value->data) + cur * _item_size);
+                    if (result != 0) {
+                        return result;
+                    }
+                }
+                ++cur;
+            }
+        }
+
+        if (l_length < r_length) {
+            return -1;
+        } else if (l_length > r_length) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    inline void shallow_copy(void* dest, const void* src) const override {
+        *reinterpret_cast<Collection*>(dest) = *reinterpret_cast<const Collection*>(src);
+    }
+
+    inline void deep_copy(void* dest, const void* src, MemPool* mem_pool) const {
+        auto dest_value = reinterpret_cast<Collection*>(dest);
+        auto src_value = reinterpret_cast<const Collection*>(src);
+
+        dest_value->length = src_value->length;
+
+        size_t item_size = src_value->length * _item_size;
+        size_t nulls_size = src_value->has_null ? src_value->length : 0;
+        dest_value->data = mem_pool->allocate(item_size + nulls_size);
+        dest_value->has_null = src_value->has_null;
+        dest_value->null_signs = src_value->has_null ? reinterpret_cast<bool*>(dest_value->data) + item_size : nullptr;
+
+        // copy null_signs
+        if (src_value->has_null) {
+            memory_copy(dest_value->null_signs, src_value->null_signs, sizeof(bool) * src_value->length);
+        }
+
+        // copy item
+        for (uint32_t i = 0; i < src_value->length; ++i) {
+            if (dest_value->is_null_at(i)) continue;
+            _item_type_info->deep_copy((uint8_t*)(dest_value->data) + i * _item_size, (uint8_t*)(src_value->data) + i * _item_size, mem_pool);
+        }
+    }
+
+    inline void copy_object(void* dest, const void* src, MemPool* mem_pool) const override {
+        deep_copy(dest, src, mem_pool);
+    }
+
+    // TODO llj: How to ensure sufficient length of item
+    inline void direct_copy(void* dest, const void* src) const override {
+        auto dest_value = reinterpret_cast<Collection*>(dest);
+        auto src_value = reinterpret_cast<const Collection*>(src);
+
+        dest_value->length = src_value->length;
+        dest_value->has_null = src_value->has_null;
+        if (src_value->has_null) {
+            // direct copy null_signs
+            memory_copy(dest_value->null_signs, src_value->null_signs, src_value->length);
+        }
+
+        // direct opy item
+        for (uint32_t i = 0; i < src_value->length; ++i) {
+            if (dest_value->is_null_at(i)) continue;
+            _item_type_info->direct_copy((uint8_t*)(dest_value->data) + i * _item_size, (uint8_t*)(src_value->data) + i * _item_size);
+        }
+    }
+
+    OLAPStatus convert_from(void* dest, const void* src, const TypeInfo* src_type, MemPool* mem_pool) const override {
+        return OLAPStatus::OLAP_ERR_FUNC_NOT_IMPLEMENTED;
+    }
+
+    OLAPStatus from_string(void* buf, const std::string& scan_key) const {
+        return OLAPStatus::OLAP_ERR_FUNC_NOT_IMPLEMENTED;
+    }
+
+    std::string to_string(const void* src) const override {
+        auto src_value = reinterpret_cast<const Collection*>(src);
+        std::string result = "[";
+
+        for (size_t i = 0; i< src_value->length; ++i) {
+            std::string item = _item_type_info->to_string((uint8_t*)(src_value->data) + i * _item_size);
+            result += item;
+            if (i != src_value->length - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    inline void set_to_max(void* buf) const override {
+        DCHECK(false) << "set_to_max of list is not implemented.";
+    }
+
+    inline void set_to_min(void* buf) const override {
+        DCHECK(false) << "set_to_min of list is not implemented.";
+    }
+
+    inline uint32_t hash_code(const void* data, uint32_t seed) const override {
+        auto value = reinterpret_cast<const Collection*>(data);
+        uint32_t result = HashUtil::hash(&(value->length), sizeof(size_t), seed);
+        for (size_t i = 0; i < value->length; ++i) {
+            if (value->null_signs[i]) {
+                result = seed * result;
+            } else {
+                result = seed * result + _item_type_info->hash_code((uint8_t*)(value->data) + i * _item_size, seed);
+            }
+        }
+        return result;
+    }
+
+    inline const size_t size() const override { return sizeof(Collection); }
+
+    inline FieldType type() const override { return OLAP_FIELD_TYPE_ARRAY; }
+
+    inline const TypeInfo* item_type_info() const { return _item_type_info; }
+private:
+    const TypeInfo* _item_type_info;
+    const size_t _item_size;
+};
+
+extern bool is_scalar_type(FieldType field_type);
+
+extern TypeInfo* get_scalar_type_info(FieldType field_type);
+
 extern TypeInfo* get_type_info(FieldType field_type);
 
+extern TypeInfo* get_type_info(segment_v2::ColumnMetaPB* column_meta_pb);
+
+extern TypeInfo* get_type_info(const TabletColumn* col);
+
 // support following formats when convert varchar to date
 static const std::vector<std::string> DATE_FORMATS {
     "%Y-%m-%d",
@@ -188,6 +421,9 @@ template<> struct CppTypeTraits<OLAP_FIELD_TYPE_HLL> {
 template<> struct CppTypeTraits<OLAP_FIELD_TYPE_OBJECT> {
     using CppType = Slice;
 };
+template<> struct CppTypeTraits<OLAP_FIELD_TYPE_ARRAY> {
+    using CppType = Collection;
+};
 
 template<FieldType field_type>
 struct BaseFieldtypeTraits : public CppTypeTraits<field_type> {
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 8bc7010..ffc4507 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -47,6 +47,7 @@ ADD_BE_TEST(delta_writer_test)
 ADD_BE_TEST(serialize_test)
 ADD_BE_TEST(olap_meta_test)
 ADD_BE_TEST(decimal12_test)
+ADD_BE_TEST(column_vector_test)
 ADD_BE_TEST(storage_types_test)
 ADD_BE_TEST(aggregate_func_test)
 ADD_BE_TEST(rowset/segment_v2/bitshuffle_page_test)
diff --git a/be/test/olap/column_vector_test.cpp b/be/test/olap/column_vector_test.cpp
new file mode 100644
index 0000000..4d73a9e
--- /dev/null
+++ b/be/test/olap/column_vector_test.cpp
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "olap/collection.h"
+#include "olap/column_vector.h"
+#include "olap/tablet_schema_helper.h"
+#include "olap/types.cpp"
+#include "olap/field.h"
+
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+
+namespace doris {
+
+class ColumnVectorTest : public testing::Test {
+public:
+    ColumnVectorTest() : _pool(&_tracker) {}
+
+protected:
+    void SetUp() {}
+    void TearDown() {}
+private:
+    MemTracker _tracker;
+    MemPool _pool;
+};
+
+template<FieldType type>
+void test_read_write_scalar_column_vector(const TypeInfo* type_info, const uint8_t* src_data, size_t data_size) {
+    using Type = typename TypeTraits<type>::CppType;
+    Type* src = (Type*)src_data;
+    size_t TYPE_SIZE  = sizeof(Type);
+
+    size_t init_size = data_size / 2;
+    std::unique_ptr<ColumnVectorBatch> cvb;
+    ASSERT_TRUE(ColumnVectorBatch::create(init_size, true, type_info, nullptr, &cvb).ok());
+    memcpy(cvb->mutable_cell_ptr(0), src, init_size * TYPE_SIZE);
+    cvb->set_null_bits(0, init_size, false);
+    ASSERT_TRUE(cvb->resize(data_size).ok());
+    size_t second_write_size = data_size - init_size;
+    memcpy(cvb->mutable_cell_ptr(init_size), src + init_size, second_write_size * TYPE_SIZE);
+    cvb->set_null_bits(init_size, second_write_size, false);
+    for (size_t idx = 0; idx < data_size; ++idx) {
+        if (type_info->type() == OLAP_FIELD_TYPE_VARCHAR || type_info->type() == OLAP_FIELD_TYPE_CHAR) {
+            Slice* src_slice = (Slice*)src_data;
+
+            ASSERT_EQ(src_slice[idx].to_string(),
+                      reinterpret_cast<const Slice*>(cvb->cell_ptr(idx))->to_string()) << "idx:" << idx;
+        } else {
+            ASSERT_EQ(src[idx], *reinterpret_cast<const Type*>(cvb->cell_ptr(idx)));
+        }
+    }
+}
+
+template<FieldType item_type>
+void test_read_write_list_column_vector(const ArrayTypeInfo* list_type_info,
+        segment_v2::ordinal_t* ordinals, // n + 1
+        size_t list_size,
+        const uint8_t* src_item_data,
+        Collection* result) {
+    DCHECK(list_size > 1);
+
+    using ItemType = typename TypeTraits<item_type>::CppType;
+    ItemType* src_item = (ItemType*)src_item_data;
+    size_t ITEM_TYPE_SIZE  = sizeof(ItemType);
+
+    TabletColumn list_column(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY);
+    TabletColumn item_column(OLAP_FIELD_AGGREGATION_NONE, item_type, true, 0, 0);
+    list_column.add_sub_column(item_column);
+    Field* field = FieldFactory::create(list_column);
+
+    size_t list_init_size = list_size / 2;
+    std::unique_ptr<ColumnVectorBatch> cvb;
+    ASSERT_TRUE(ColumnVectorBatch::create(list_init_size, true, list_type_info, field, &cvb).ok());
+
+    ArrayColumnVectorBatch* list_cvb = reinterpret_cast<ArrayColumnVectorBatch*>(cvb.get());
+    ColumnVectorBatch* item_cvb = list_cvb->elements();
+
+    // first write
+    list_cvb->put_item_ordinal(ordinals, 0, list_init_size + 1);
+    list_cvb->set_null_bits(0, list_init_size, false);
+    size_t first_write_item = ordinals[list_init_size] - ordinals[0];
+    ASSERT_TRUE(item_cvb->resize(first_write_item).ok());
+    memcpy(item_cvb->mutable_cell_ptr(0), src_item, first_write_item * ITEM_TYPE_SIZE);
+    item_cvb->set_null_bits(0, first_write_item, false);
+    list_cvb->prepare_for_read(0, list_init_size, false);
+
+    // second write
+    ASSERT_TRUE(list_cvb->resize(list_size).ok());
+    list_cvb->put_item_ordinal(ordinals + list_init_size, list_init_size, list_size - list_init_size + 1);
+    list_cvb->set_null_bits(list_init_size, list_size - list_init_size, false);
+    size_t item_size = ordinals[list_size] - ordinals[0];
+    ASSERT_TRUE(item_cvb->resize(item_size).ok());
+    size_t second_write_item = item_size - first_write_item;
+    memcpy(item_cvb->mutable_cell_ptr(first_write_item),
+           src_item + first_write_item,
+           second_write_item * ITEM_TYPE_SIZE);
+    item_cvb->set_null_bits(first_write_item, second_write_item, false);
+    list_cvb->prepare_for_read(0, list_size, false);
+
+    for (size_t idx = 0; idx < list_size; ++idx) {
+        ASSERT_TRUE(list_type_info->equal(&result[idx], list_cvb->cell_ptr(idx))) << "idx:" << idx;
+    }
+    delete field;
+}
+
+TEST_F(ColumnVectorTest, scalar_column_vector_test) {
+    {
+        size_t size = 1024;
+        uint8_t* val = new uint8_t[size];
+        for (int i = 0; i < size; ++i) {
+            val[i] = i;
+        }
+        const TypeInfo* ti = get_scalar_type_info(OLAP_FIELD_TYPE_TINYINT);
+        test_read_write_scalar_column_vector<OLAP_FIELD_TYPE_TINYINT>(ti, val, size);
+        delete[] val;
+    }
+    {
+        size_t size = 1024;
+        Slice* char_vals = new Slice[size];
+        for (int i = 0; i < size; ++i) {
+            set_column_value_by_type(OLAP_FIELD_TYPE_CHAR, i, (char*)&char_vals[i], &_pool, 8);
+        }
+        const TypeInfo* ti = get_scalar_type_info(OLAP_FIELD_TYPE_CHAR);
+        test_read_write_scalar_column_vector<OLAP_FIELD_TYPE_CHAR>(ti, (uint8_t*)char_vals, size);
+        delete[] char_vals;
+    }
+}
+
+TEST_F(ColumnVectorTest, list_column_vector_test) {
+    size_t num_list = 1024;
+    size_t num_item = num_list * 3;
+    {
+        Collection* list_val = new Collection[num_list];
+        uint8_t* item_val = new uint8_t[num_item];
+        segment_v2::ordinal_t* ordinals = new segment_v2::ordinal_t[num_list + 1];
+        bool null_signs[3] = {false, false, false};
+        memset(null_signs, 0, sizeof(bool) * 3);
+        for (int i = 0; i < num_item; ++i) {
+            item_val[i] = i;
+            if (i % 3 == 0) {
+                size_t list_index = i / 3;
+                list_val[list_index].data = &item_val[i];
+                list_val[list_index].null_signs = null_signs;
+                list_val[list_index].length = 3;
+                ordinals[list_index] = i;
+            }
+        }
+        ordinals[num_list] = num_item;
+        auto type_info = reinterpret_cast<ArrayTypeInfo*>(ArrayTypeInfoResolver::instance()->get_type_info(OLAP_FIELD_TYPE_TINYINT));
+        test_read_write_list_column_vector<OLAP_FIELD_TYPE_TINYINT>(type_info, ordinals, num_list, item_val, list_val);
+
+        delete[] ordinals;
+        delete[] list_val;
+        delete[] item_val;
+    }
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
+
diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
index 826ad29..35cf6f6 100644
--- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
@@ -82,13 +82,15 @@ public:
         //check values
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
+        TypeInfo* type_info = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR);
         size_t size = slices.size();
-        Slice* values = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, false, type_info, nullptr, &cvb);
+        ColumnBlock column_block(cvb.get(), &pool);
         ColumnBlockView block_view(&column_block);
 
         status = page_decoder.next_batch(&size, &block_view);
+        Slice* values = reinterpret_cast<Slice*>(column_block.data());
         ASSERT_TRUE(status.ok());
         ASSERT_EQ(slices.size(), size);
         ASSERT_EQ("Individual", values[0].to_string());
@@ -172,10 +174,12 @@ public:
             //check values
             auto tracker = std::make_shared<MemTracker>();
             MemPool pool(tracker.get());
-            TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
-            Slice* values = reinterpret_cast<Slice*>(pool.allocate(sizeof(Slice)));
-            ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, 1, &pool);
+            TypeInfo* type_info = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(1, false, type_info, nullptr, &cvb);
+            ColumnBlock column_block(cvb.get(), &pool);
             ColumnBlockView block_view(&column_block);
+            Slice* values = reinterpret_cast<Slice*>(column_block.data());
 
             size_t num = 1;
             size_t pos = random() % (page_start_ids[slice_index + 1] - page_start_ids[slice_index]);
diff --git a/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp
index d155e94..5cb38b1 100644
--- a/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp
@@ -72,12 +72,13 @@ public:
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
         size_t size = 3;
-        Slice* values = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(pool.allocate(BitmapSize(size)));
-        ColumnBlock block(get_type_info(OLAP_FIELD_TYPE_VARCHAR), (uint8_t*)values, null_bitmap, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, true, get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         status = page_decoder.next_batch(&size, &column_block_view);
+        Slice* values = reinterpret_cast<Slice*>(block.data());
         ASSERT_TRUE(status.ok());
 
         Slice* value = reinterpret_cast<Slice*>(values);
@@ -86,13 +87,15 @@ public:
         ASSERT_EQ (",", value[1].to_string());
         ASSERT_EQ ("Doris", value[2].to_string());
         
-        Slice* values2 = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        ColumnBlock block2(get_type_info(OLAP_FIELD_TYPE_VARCHAR), (uint8_t*)values2, null_bitmap, 1, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb2;
+        ColumnVectorBatch::create(1, true, get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR), nullptr, &cvb2);
+        ColumnBlock block2(cvb2.get(), &pool);
         ColumnBlockView column_block_view2(&block2);
 
         size_t fetch_num = 1;
         page_decoder.seek_to_position_in_page(2);
         status = page_decoder.next_batch(&fetch_num, &column_block_view2);
+        Slice* values2 = reinterpret_cast<Slice*>(block2.data());
         ASSERT_TRUE(status.ok());
         Slice* value2 = reinterpret_cast<Slice*>(values2);
         ASSERT_EQ (1, fetch_num);
diff --git a/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp
index 02dd7bf..670165d 100644
--- a/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp
@@ -74,26 +74,30 @@ class BinaryPrefixPageTest : public testing::Test {
         //check values
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
+        TypeInfo* type_info = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR);
         size_t size = slices.size();
-        Slice* values = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, false, type_info, nullptr, &cvb);
+        ColumnBlock column_block(cvb.get(), &pool);
         ColumnBlockView block_view(&column_block);
 
         ret = page_decoder->next_batch(&size, &block_view);
+        Slice* values = reinterpret_cast<Slice*>(column_block.data());
         ASSERT_TRUE(ret.ok());
         ASSERT_EQ(slices.size(), size);
         for (int i = 1000; i < 1038; ++i) {
             ASSERT_EQ(std::to_string(i), values[i - 1000].to_string());
         }
 
-        values = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        ColumnBlock column_block2(type_info, (uint8_t*)values, nullptr, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb2;
+        ColumnVectorBatch::create(size, false, type_info, nullptr, &cvb2);
+        ColumnBlock column_block2(cvb2.get(), &pool);
         ColumnBlockView block_view2(&column_block2);
         ret = page_decoder->seek_to_position_in_page(15);
         ASSERT_TRUE(ret.ok());
 
         ret = page_decoder->next_batch(&size, &block_view2);
+        values = reinterpret_cast<Slice*>(column_block2.data());
         ASSERT_TRUE(ret.ok());
         ASSERT_EQ(23, size);
         for (int i = 1015; i < 1038; ++i) {
@@ -146,14 +150,6 @@ class BinaryPrefixPageTest : public testing::Test {
         ret = page_decoder->init();
         ASSERT_TRUE(ret.ok());
 
-        auto tracker = std::make_shared<MemTracker>();
-        MemPool pool(tracker.get());
-        TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
-        size_t size = slices.size();
-        Slice* values = reinterpret_cast<Slice*>(pool.allocate(size * sizeof(Slice)));
-        ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, size, &pool);
-        ColumnBlockView block_view(&column_block);
-
         Slice slice("c");
         bool exact_match;
         ret = page_decoder->seek_at_or_after_value(&slice, &exact_match);
diff --git a/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp b/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp
index 4860f2d..ec37c6e 100644
--- a/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp
@@ -38,13 +38,15 @@ public:
     void copy_one(PageDecoderType* decoder, typename TypeTraits<type>::CppType* ret) {
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        uint8_t null_bitmap = 0;
-        ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, 1, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(1, true, get_scalar_type_info(type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         size_t n = 1;
         decoder->_copy_next_values(n, column_block_view.data());
         ASSERT_EQ(1, n);
+        *ret = *reinterpret_cast<const typename TypeTraits<type>::CppType*>(block.cell_ptr(0));
     }
 
     template <FieldType Type, class PageBuilderType, class PageDecoderType>
@@ -75,14 +77,15 @@ public:
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
 
-        CppType* values = reinterpret_cast<CppType*>(pool.allocate(size * sizeof(CppType)));
-        uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(pool.allocate(BitmapSize(size)));
-        ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, false, get_scalar_type_info(Type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         status = page_decoder.next_batch(&size, &column_block_view);
         ASSERT_TRUE(status.ok());
 
+        CppType* values = reinterpret_cast<CppType*>(block.data());
         CppType* decoded = (CppType*)values;
         for (uint i = 0; i < size; i++) {
             if (src[i] != decoded[i]) {
diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
index 1f3f664..5caaa22 100644
--- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
@@ -68,7 +68,6 @@ template<FieldType type, EncodingTypePB encoding>
 void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, string test_name) {
     using Type = typename TypeTraits<type>::CppType;
     Type* src = (Type*)src_data;
-    const TypeInfo* type_info = get_type_info(type);
 
     ColumnMetaPB meta;
 
@@ -101,24 +100,25 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
         } else if (type == OLAP_FIELD_TYPE_CHAR) {
             column = create_char_key(1);
         }
-        std::unique_ptr<Field> field(FieldFactory::create(column));
-        ColumnWriter writer(writer_opts, std::move(field), wblock.get());
-        st = writer.init();
+        std::unique_ptr<ColumnWriter> writer;
+        ColumnWriter::create(writer_opts, &column, wblock.get(), &writer);
+        st = writer->init();
         ASSERT_TRUE(st.ok()) << st.to_string();
 
         for (int i = 0; i < num_rows; ++i) {
-            st = writer.append(BitmapTest(src_is_null, i), src + i);
+            st = writer->append(BitmapTest(src_is_null, i), src + i);
             ASSERT_TRUE(st.ok());
         }
 
-        ASSERT_TRUE(writer.finish().ok());
-        ASSERT_TRUE(writer.write_data().ok());
-        ASSERT_TRUE(writer.write_ordinal_index().ok());
-        ASSERT_TRUE(writer.write_zone_map().ok());
+        ASSERT_TRUE(writer->finish().ok());
+        ASSERT_TRUE(writer->write_data().ok());
+        ASSERT_TRUE(writer->write_ordinal_index().ok());
+        ASSERT_TRUE(writer->write_zone_map().ok());
 
         // close the file
         ASSERT_TRUE(wblock->close().ok());
     }
+    const TypeInfo* type_info = get_scalar_type_info(type);
     // read and check
     {
         // read and check
@@ -148,10 +148,10 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
 
             auto tracker = std::make_shared<MemTracker>();
             MemPool pool(tracker.get());
-            Type vals[1024];
-            Type* vals_ = vals;
-            uint8_t is_null[1024];
-            ColumnBlock col(type_info, (uint8_t*)vals, is_null, 1024, &pool);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, nullptr, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
 
             int idx = 0;
             while (true) {
@@ -160,15 +160,14 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
                 st = iter->next_batch(&rows_read, &dst);
                 ASSERT_TRUE(st.ok());
                 for (int j = 0; j < rows_read; ++j) {
-                    ASSERT_EQ(BitmapTest(src_is_null, idx), BitmapTest(is_null, j));
-                    if (!BitmapTest(is_null, j)) {
+                    ASSERT_EQ(BitmapTest(src_is_null, idx), col.is_null(j));
+                    if (!col.is_null(j)) {
                         if (type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR) {
                             Slice* src_slice = (Slice*)src_data;
-                            Slice* dst_slice = (Slice*)vals_;
                             ASSERT_EQ(src_slice[idx].to_string(),
-                                      dst_slice[j].to_string()) << "j:" << j;
+                                    reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string()) << "j:" << j;
                         } else {
-                            ASSERT_EQ(src[idx], vals[j]);
+                            ASSERT_EQ(src[idx], *reinterpret_cast<const Type*>(col.cell_ptr(j)));
                         }
                     }
                     idx++;
@@ -182,9 +181,10 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
         {
             auto tracker = std::make_shared<MemTracker>();
             MemPool pool(tracker.get());
-            Type vals[1024];
-            uint8_t is_null[1024];
-            ColumnBlock col(type_info, (uint8_t*)vals, is_null, 1024, &pool);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, nullptr, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
 
             for (int rowid = 0; rowid < num_rows; rowid += 4025) {
                 st = iter->seek_to_ordinal(rowid);
@@ -193,17 +193,18 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
                 int idx = rowid;
                 size_t rows_read = 1024;
                 ColumnBlockView dst(&col);
+
                 st = iter->next_batch(&rows_read, &dst);
                 ASSERT_TRUE(st.ok());
                 for (int j = 0; j < rows_read; ++j) {
-                    ASSERT_EQ(BitmapTest(src_is_null, idx), BitmapTest(is_null, j));
-                    if (!BitmapTest(is_null, j)) {
+                    ASSERT_EQ(BitmapTest(src_is_null, idx), col.is_null(j));
+                    if (!col.is_null(j)) {
                         if (type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR) {
                             Slice* src_slice = (Slice*)src_data;
-                            Slice* dst_slice = (Slice*)vals;
-                            ASSERT_EQ(src_slice[idx].to_string(), dst_slice[j].to_string());
+                            ASSERT_EQ(src_slice[idx].to_string(),
+                                    reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string());
                         } else {
-                            ASSERT_EQ(src[idx], vals[j]);
+                            ASSERT_EQ(src[idx], *reinterpret_cast<const Type*>(col.cell_ptr(j)));
                         }
                     }
                     idx++;
@@ -215,17 +216,224 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
     }
 }
 
+template<FieldType item_type, EncodingTypePB item_encoding, EncodingTypePB array_encoding>
+void test_array_nullable_data(Collection* src_data, uint8_t* src_is_null, int num_rows, string test_name) {
+    Collection* src = src_data;
+    ColumnMetaPB meta;
+    TabletColumn list_column(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY);
+    int32 item_length = 0;
+    if (item_type == OLAP_FIELD_TYPE_CHAR || item_type == OLAP_FIELD_TYPE_VARCHAR) {
+        item_length = 10;
+    }
+    TabletColumn item_column(OLAP_FIELD_AGGREGATION_NONE, item_type, true, 0, item_length);
+    list_column.add_sub_column(item_column);
+    Field* field = FieldFactory::create(list_column);
+
+    // write data
+    string fname = TEST_DIR + "/" + test_name;
+    {
+        std::unique_ptr<fs::WritableBlock> wblock;
+        fs::CreateBlockOptions opts({ fname });
+        Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock);
+        ASSERT_TRUE(st.ok()) << st.get_error_msg();
+
+        ColumnWriterOptions writer_opts;
+        writer_opts.meta = &meta;
+        writer_opts.meta->set_column_id(0);
+        writer_opts.meta->set_unique_id(0);
+        writer_opts.meta->set_type(OLAP_FIELD_TYPE_ARRAY);
+        writer_opts.meta->set_length(0);
+        writer_opts.meta->set_encoding(array_encoding);
+        writer_opts.meta->set_compression(segment_v2::CompressionTypePB::LZ4F);
+        writer_opts.meta->set_is_nullable(true);
+        writer_opts.data_page_size = 5 * 8;
+
+        ColumnMetaPB* child_meta = meta.add_children_columns();
+
+        child_meta->set_column_id(1);
+        child_meta->set_unique_id(1);
+        child_meta->set_type(item_type);
+        child_meta->set_length(item_length);
+        child_meta->set_encoding(item_encoding);
+        child_meta->set_compression(segment_v2::CompressionTypePB::LZ4F);
+        child_meta->set_is_nullable(true);
+
+        std::unique_ptr<ColumnWriter> writer;
+        ColumnWriter::create(writer_opts, &list_column, wblock.get(), &writer);
+        st = writer->init();
+        ASSERT_TRUE(st.ok()) << st.to_string();
+
+        for (int i = 0; i < num_rows; ++i) {
+            st = writer->append(BitmapTest(src_is_null, i), src + i);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->finish();
+        ASSERT_TRUE(st.ok());
+
+        st = writer->write_data();
+        ASSERT_TRUE(st.ok());
+        st = writer->write_ordinal_index();
+        ASSERT_TRUE(st.ok());
+
+        // close the file
+        ASSERT_TRUE(wblock->close().ok());
+    }
+    TypeInfo* type_info = get_type_info(&meta);
+
+    // read and check
+    {
+        ColumnReaderOptions reader_opts;
+        std::unique_ptr<ColumnReader> reader;
+        auto st = ColumnReader::create(reader_opts, meta, num_rows, fname, &reader);
+        ASSERT_TRUE(st.ok());
+
+        ColumnIterator* iter = nullptr;
+        st = reader->new_iterator(&iter);
+        ASSERT_TRUE(st.ok());
+        std::unique_ptr<fs::ReadableBlock> rblock;
+        fs::BlockManager* block_manager = fs::fs_util::block_mgr_for_ut();
+        st = block_manager->open_block(fname, &rblock);
+        ASSERT_TRUE(st.ok());
+        ColumnIteratorOptions iter_opts;
+        OlapReaderStatistics stats;
+        iter_opts.stats = &stats;
+        iter_opts.rblock = rblock.get();
+        st = iter->init(iter_opts);
+        ASSERT_TRUE(st.ok());
+        // sequence read
+        {
+            st = iter->seek_to_first();
+            ASSERT_TRUE(st.ok()) << st.to_string();
+
+            MemTracker tracker;
+            MemPool pool(&tracker);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, field, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
+
+            int idx = 0;
+            while (true) {
+                size_t rows_read = 1024;
+                ColumnBlockView dst(&col);
+                st = iter->next_batch(&rows_read, &dst);
+                ASSERT_TRUE(st.ok());
+                for (int j = 0; j < rows_read; ++j) {
+                    ASSERT_EQ(BitmapTest(src_is_null, idx), col.is_null(j));
+                    if (!col.is_null(j)) {
+                        ASSERT_TRUE(type_info->equal(&src[idx], col.cell_ptr(j)));
+                    }
+                    ++idx;
+                }
+                if (rows_read < 1024) {
+                    break;
+                }
+            }
+        }
+        // seek read
+        {
+            MemTracker tracker;
+            MemPool pool(&tracker);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, field, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
+
+            for (int rowid = 0; rowid < num_rows; rowid += 4025) {
+                st = iter->seek_to_ordinal(rowid);
+                ASSERT_TRUE(st.ok());
+
+                int idx = rowid;
+                size_t rows_read = 1024;
+                ColumnBlockView dst(&col);
+
+                st = iter->next_batch(&rows_read, &dst);
+                ASSERT_TRUE(st.ok());
+                for (int j = 0; j < rows_read; ++j) {
+                    ASSERT_EQ(BitmapTest(src_is_null, idx), col.is_null(j));
+                    if (!col.is_null(j)) {
+                        ASSERT_TRUE(type_info->equal(&src[idx], col.cell_ptr(j)));
+                    }
+                    ++idx;
+                }
+            }
+        }
+        delete iter;
+    }
+    delete field;
+}
+
+
+TEST_F(ColumnReaderWriterTest, test_array_type) {
+    size_t num_list = 24 * 1024;
+    size_t num_item = num_list * 3;
+
+    uint8_t* array_is_null = new uint8_t[BitmapSize(num_list)];
+    Collection* array_val = new Collection[num_list];
+    bool* item_is_null = new bool[num_item];
+    uint8_t* item_val = new uint8_t[num_item];
+    for (int i = 0; i < num_item; ++i) {
+        item_val[i] = i;
+        item_is_null[i] = (i % 4) == 0;
+        if (i % 3 == 0) {
+            size_t list_index = i / 3;
+            bool is_null = (list_index % 4) == 1;
+            BitmapChange(array_is_null, list_index, is_null);
+            if (is_null) {
+                continue;
+            }
+            array_val[list_index].data = &item_val[i];
+            array_val[list_index].null_signs = &item_is_null[i];
+            array_val[list_index].length = 3;
+        }
+    }
+    test_array_nullable_data<OLAP_FIELD_TYPE_TINYINT, BIT_SHUFFLE, BIT_SHUFFLE>(array_val, array_is_null, num_list, "null_array_bs");
+
+    delete[] array_val;
+    delete[] item_val;
+    delete[] item_is_null;
+
+    array_val = new Collection[num_list];
+    Slice* varchar_vals = new Slice[3];
+    item_is_null = new bool[3];
+    for (int i = 0; i < 3; ++i) {
+        item_is_null[i] = i == 1;
+        if (i != 1) {
+            set_column_value_by_type(OLAP_FIELD_TYPE_VARCHAR, i, (char*)&varchar_vals[i], &_pool);
+        }
+    }
+    for (int i = 0; i <  num_list; ++i) {
+        bool is_null = (i % 4) == 1;
+        BitmapChange(array_is_null, i, is_null);
+        if (is_null) {
+            continue;
+        }
+        array_val[i].data = varchar_vals;
+        array_val[i].null_signs = item_is_null;
+        array_val[i].length = 3;
+    }
+    test_array_nullable_data<OLAP_FIELD_TYPE_VARCHAR, DICT_ENCODING, BIT_SHUFFLE>(array_val, array_is_null, num_list, "null_array_chars");
+
+    delete[] array_val;
+    delete[] varchar_vals;
+    delete[] item_is_null;
+
+    delete[] array_is_null;
+}
+
+
 template<FieldType type>
 void test_read_default_value(string value, void* result) {
     using Type = typename TypeTraits<type>::CppType;
-    const TypeInfo* type_info = get_type_info(type);
+    TypeInfo* type_info = get_type_info(type);
     // read and check
     {
         TabletColumn tablet_column = create_with_default_value<type>(value);
         DefaultValueColumnIterator iter(tablet_column.has_default_value(),
                                         tablet_column.default_value(),
                                         tablet_column.is_nullable(),
-                                        tablet_column.type(),
+                                        type_info,
                                         tablet_column.length());
         ColumnIteratorOptions iter_opts;
         auto st = iter.init(iter_opts);
@@ -237,27 +445,27 @@ void test_read_default_value(string value, void* result) {
 
             auto tracker = std::make_shared<MemTracker>();
             MemPool pool(tracker.get());
-            Type vals[1024];
-            Type* vals_ = vals;
-            uint8_t is_null[1024];
-            ColumnBlock col(type_info, (uint8_t*)vals, is_null, 1024, &pool);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, nullptr, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
 
             int idx = 0;
             size_t rows_read = 1024;
             ColumnBlockView dst(&col);
-            st = iter.next_batch(&rows_read, &dst);
+            bool has_null;
+            st = iter.next_batch(&rows_read, &dst, &has_null);
             ASSERT_TRUE(st.ok());
             for (int j = 0; j < rows_read; ++j) {
                 if (type == OLAP_FIELD_TYPE_CHAR) {
-                    Slice* dst_slice = (Slice*)vals_;
-                    ASSERT_EQ(*(string*)result, dst_slice[j].to_string()) << "j:" << j;
+                    ASSERT_EQ(*(string*)result, reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string()) << "j:" << j;
                 } else if (type == OLAP_FIELD_TYPE_VARCHAR
                         || type == OLAP_FIELD_TYPE_HLL
                         || type == OLAP_FIELD_TYPE_OBJECT) {
-                    Slice* dst_slice = (Slice*)vals_;
-                    ASSERT_EQ(value, dst_slice[j].to_string()) << "j:" << j;
+                    ASSERT_EQ(value, reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string()) << "j:" << j;
                 } else {
-                    ASSERT_EQ(*(Type*)result, vals[j]);
+                    ;
+                    ASSERT_EQ(*(Type*)result, *(reinterpret_cast<const Type*>(col.cell_ptr(j))));
                 }
                 idx++;
             }
@@ -266,9 +474,10 @@ void test_read_default_value(string value, void* result) {
         {
             auto tracker = std::make_shared<MemTracker>();
             MemPool pool(tracker.get());
-            Type vals[1024];
-            uint8_t is_null[1024];
-            ColumnBlock col(type_info, (uint8_t*)vals, is_null, 1024, &pool);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            ColumnVectorBatch::create(0, true, type_info, nullptr, &cvb);
+            cvb->resize(1024);
+            ColumnBlock col(cvb.get(), &pool);
 
             for (int rowid = 0; rowid < 2048; rowid += 128) {
                 st = iter.seek_to_ordinal(rowid);
@@ -277,19 +486,18 @@ void test_read_default_value(string value, void* result) {
                 int idx = rowid;
                 size_t rows_read = 1024;
                 ColumnBlockView dst(&col);
-                st = iter.next_batch(&rows_read, &dst);
+                bool has_null;
+                st = iter.next_batch(&rows_read, &dst, &has_null);
                 ASSERT_TRUE(st.ok());
                 for (int j = 0; j < rows_read; ++j) {
                     if (type == OLAP_FIELD_TYPE_CHAR) {
-                        Slice* dst_slice = (Slice*)vals;
-                        ASSERT_EQ(*(string*)result, dst_slice[j].to_string()) << "j:" << j;
+                        ASSERT_EQ(*(string*)result, reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string()) << "j:" << j;
                     } else if (type == OLAP_FIELD_TYPE_VARCHAR
                             || type == OLAP_FIELD_TYPE_HLL
                             || type == OLAP_FIELD_TYPE_OBJECT) {
-                        Slice* dst_slice = (Slice*)vals;
-                        ASSERT_EQ(value, dst_slice[j].to_string());
+                        ASSERT_EQ(value, reinterpret_cast<const Slice*>(col.cell_ptr(j))->to_string());
                     } else {
-                        ASSERT_EQ(*(Type*)result, vals[j]);
+                        ASSERT_EQ(*(Type*)result, *(reinterpret_cast<const Type*>(col.cell_ptr(j))));
                     }
                     idx++;
                 }
diff --git a/be/test/olap/rowset/segment_v2/encoding_info_test.cpp b/be/test/olap/rowset/segment_v2/encoding_info_test.cpp
index b4690f4..ed0727b 100644
--- a/be/test/olap/rowset/segment_v2/encoding_info_test.cpp
+++ b/be/test/olap/rowset/segment_v2/encoding_info_test.cpp
@@ -35,7 +35,7 @@ public:
 };
 
 TEST_F(EncodingInfoTest, normal) {
-    auto type_info = get_type_info(OLAP_FIELD_TYPE_BIGINT);
+    auto type_info = get_scalar_type_info(OLAP_FIELD_TYPE_BIGINT);
     const EncodingInfo* encoding_info = nullptr;
     auto status = EncodingInfo::get(type_info, PLAIN_ENCODING, &encoding_info);
     ASSERT_TRUE(status.ok());
@@ -43,7 +43,7 @@ TEST_F(EncodingInfoTest, normal) {
 }
 
 TEST_F(EncodingInfoTest, no_encoding) {
-    auto type_info = get_type_info(OLAP_FIELD_TYPE_BIGINT);
+    auto type_info = get_scalar_type_info(OLAP_FIELD_TYPE_BIGINT);
     const EncodingInfo* encoding_info = nullptr;
     auto status = EncodingInfo::get(type_info, DICT_ENCODING, &encoding_info);
     ASSERT_FALSE(status.ok());
diff --git a/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp
index cc5c904..f96ad20 100644
--- a/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp
@@ -37,13 +37,15 @@ public:
     void copy_one(PageDecoderType* decoder, typename TypeTraits<type>::CppType* ret) {
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        uint8_t null_bitmap = 0;
-        ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, 1, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(1, true, get_scalar_type_info(type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         size_t n = 1;
         decoder->next_batch(&n, &column_block_view);
         ASSERT_EQ(1, n);
+        *ret = *reinterpret_cast<const typename TypeTraits<type>::CppType*>(block.cell_ptr(0));
     }
 
     template <FieldType Type, class PageBuilderType, class PageDecoderType>
@@ -68,15 +70,17 @@ public:
 
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        CppType* values = reinterpret_cast<CppType*>(pool.allocate(size * sizeof(CppType)));
-        uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(pool.allocate(BitmapSize(size)));
-        ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, true, get_scalar_type_info(Type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
         size_t size_to_fetch = size;
         status = for_page_decoder.next_batch(&size_to_fetch, &column_block_view);
         ASSERT_TRUE(status.ok());
         ASSERT_EQ(size, size_to_fetch);
 
+        CppType* values = reinterpret_cast<CppType*>(column_block_view.data());
+
         for (uint i = 0; i < size; i++) {
             if (src[i] != values[i]) {
                 FAIL() << "Fail at index " << i <<
diff --git a/be/test/olap/rowset/segment_v2/plain_page_test.cpp b/be/test/olap/rowset/segment_v2/plain_page_test.cpp
index 1f0e164..aa23b25 100644
--- a/be/test/olap/rowset/segment_v2/plain_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/plain_page_test.cpp
@@ -48,13 +48,15 @@ public:
     void copy_one(PageDecoderType* decoder, typename TypeTraits<type>::CppType* ret) {
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        uint8_t null_bitmap = 0;
-        ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, 1, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(1, true, get_scalar_type_info(type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         size_t n = 1;
         decoder->next_batch(&n, &column_block_view);
         ASSERT_EQ(1, n);
+        *ret = *reinterpret_cast<const typename TypeTraits<type>::CppType*>(block.cell_ptr(0));
     }
 
     template <FieldType Type, class PageBuilderType, class PageDecoderType>
@@ -87,14 +89,14 @@ public:
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
 
-        CppType* values = reinterpret_cast<CppType*>(pool.allocate(size * sizeof(CppType)));
-        uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(pool.allocate(BitmapSize(size)));
-        ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, true, get_scalar_type_info(Type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
         status = page_decoder.next_batch(&size, &column_block_view);
         ASSERT_TRUE(status.ok());
     
-        CppType* decoded = (CppType*)values;
+        CppType* decoded = reinterpret_cast<CppType*>(block.data());
         for (uint i = 0; i < size; i++) {
             if (src[i] != decoded[i]) {
                 FAIL() << "Fail at index " << i <<
diff --git a/be/test/olap/rowset/segment_v2/rle_page_test.cpp b/be/test/olap/rowset/segment_v2/rle_page_test.cpp
index e8c35cc..86307ec 100644
--- a/be/test/olap/rowset/segment_v2/rle_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/rle_page_test.cpp
@@ -39,13 +39,15 @@ public:
     void copy_one(PageDecoderType* decoder, typename TypeTraits<type>::CppType* ret) {
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        uint8_t null_bitmap = 0;
-        ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, 1, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(1, true, get_scalar_type_info(type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
 
         size_t n = 1;
         decoder->next_batch(&n, &column_block_view);
         ASSERT_EQ(1, n);
+        *ret = *reinterpret_cast<const typename TypeTraits<type>::CppType*>(block.cell_ptr(0));
     }
 
     template <FieldType Type, class PageBuilderType, class PageDecoderType>
@@ -76,15 +78,16 @@ public:
 
         auto tracker = std::make_shared<MemTracker>();
         MemPool pool(tracker.get());
-        CppType* values = reinterpret_cast<CppType*>(pool.allocate(size * sizeof(CppType)));
-        uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(pool.allocate(BitmapSize(size)));
-        ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, size, &pool);
+        std::unique_ptr<ColumnVectorBatch> cvb;
+        ColumnVectorBatch::create(size, true, get_scalar_type_info(Type), nullptr, &cvb);
+        ColumnBlock block(cvb.get(), &pool);
         ColumnBlockView column_block_view(&block);
         size_t size_to_fetch = size;
         status = rle_page_decoder.next_batch(&size_to_fetch, &column_block_view);
         ASSERT_TRUE(status.ok());
         ASSERT_EQ(size, size_to_fetch);
 
+        CppType* values = reinterpret_cast<CppType*>(block.data());
         for (uint i = 0; i < size; i++) {
             if (src[i] != values[i]) {
                 FAIL() << "Fail at index " << i <<
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp
index 43bd8fd..1d12365 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -182,7 +182,7 @@ TEST_F(SegmentReaderWriterTest, normal) {
                     auto column_block = block.column_block(j);
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
-                        ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                        ASSERT_FALSE(column_block.is_null(i));
                         ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                     }
                 }
@@ -494,7 +494,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
                     auto column_block = block.column_block(j);
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
-                        ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                        ASSERT_FALSE(column_block.is_null(i));
                         ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)) << "rid:" << rid << ", i:" << i;
                     }
                 }
@@ -553,7 +553,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
                     auto column_block = block.column_block(j);
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
-                        ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                        ASSERT_FALSE(column_block.is_null(i));
                         ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)) << "rid:" << rid << ", i:" << i;
                     }
                 }
@@ -689,9 +689,9 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
                         if (cid == 4) {
-                            ASSERT_TRUE(BitmapTest(column_block.null_bitmap(), i));
+                            ASSERT_TRUE(column_block.is_null(i));
                         } else {
-                            ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                            ASSERT_FALSE(column_block.is_null(i));
                             ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                         }
                     }
@@ -737,10 +737,10 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
                         if (cid == 4) {
-                            ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                            ASSERT_FALSE(column_block.is_null(i));
                             ASSERT_EQ(10086, *(int*)column_block.cell_ptr(i));
                         } else {
-                            ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                            ASSERT_FALSE(column_block.is_null(i));
                             ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                         }
                     }
@@ -839,7 +839,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
                     auto column_block = block.column_block(j);
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
-                        ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                        ASSERT_FALSE(column_block.is_null(i));
                         const Slice* actual = reinterpret_cast<const Slice*>(column_block.cell_ptr(i));
 
                         Slice expect;
@@ -941,7 +941,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
                     auto column_block = block.column_block(j);
                     for (int i = 0; i < rows_read; ++i) {
                         int rid = rowid + i;
-                        ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i));
+                        ASSERT_FALSE(column_block.is_null(i));
 
                         const Slice* actual = reinterpret_cast<const Slice*>(column_block.cell_ptr(i));
                         Slice expect;
diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp
index e7ede79..c7e068a 100644
--- a/be/test/olap/schema_change_test.cpp
+++ b/be/test/olap/schema_change_test.cpp
@@ -298,7 +298,7 @@ public:
             ASSERT_TRUE(dst_str.compare(0, value.size(), value) == 0);
         }
 
-        TypeInfo* tp = get_type_info(OLAP_FIELD_TYPE_HLL);
+        TypeInfo* tp = get_scalar_type_info(OLAP_FIELD_TYPE_HLL);
         st = read_row.convert_from(0, read_row.cell_ptr(0), tp, _mem_pool.get());
         ASSERT_EQ(st, OLAP_ERR_INVALID_SCHEMA);
     }
@@ -375,7 +375,7 @@ TEST_F(TestColumn, ConvertFloatToDouble) {
     ASSERT_TRUE(v2 == 1.234);
     
     //test not support type
-    TypeInfo* tp = get_type_info(OLAP_FIELD_TYPE_HLL);
+    TypeInfo* tp = get_scalar_type_info(OLAP_FIELD_TYPE_HLL);
     OLAPStatus st = read_row.convert_from(0, data, tp, _mem_pool.get());
     ASSERT_TRUE( st == OLAP_ERR_INVALID_SCHEMA);
 }
diff --git a/be/test/olap/storage_types_test.cpp b/be/test/olap/storage_types_test.cpp
index c49ae9b..5bd4580 100644
--- a/be/test/olap/storage_types_test.cpp
+++ b/be/test/olap/storage_types_test.cpp
@@ -35,7 +35,7 @@ public:
 
 template<FieldType field_type>
 void common_test(typename TypeTraits<field_type>::CppType src_val) {
-    TypeInfo* type = get_type_info(field_type);
+    TypeInfo* type = get_scalar_type_info(field_type);
 
     ASSERT_EQ(field_type, type->type());
     ASSERT_EQ(sizeof(src_val), type->size());
@@ -148,6 +148,83 @@ TEST(TypesTest, copy_and_equal) {
     common_test<OLAP_FIELD_TYPE_VARCHAR>(slice);
 }
 
+template<FieldType item_type>
+void common_test_array(Collection src_val) {
+    TabletColumn list_column(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY);
+    int32 item_length = 0;
+    if (item_type == OLAP_FIELD_TYPE_CHAR || item_type == OLAP_FIELD_TYPE_VARCHAR) {
+        item_length = 10;
+    }
+    TabletColumn item_column(OLAP_FIELD_AGGREGATION_NONE, item_type, true, 0, item_length);
+    list_column.add_sub_column(item_column);
+
+    auto* array_type = dynamic_cast<ArrayTypeInfo*>(get_type_info(&list_column));
+
+    ASSERT_EQ(item_type, array_type->item_type_info()->type());
+
+    { // test deep copy
+        Collection dst_val;
+        auto tracker = std::make_shared<MemTracker>();
+        MemPool pool(tracker.get());
+        array_type->deep_copy((char*)&dst_val, (char*)&src_val, &pool);
+        ASSERT_TRUE(array_type->equal((char*)&src_val, (char*)&dst_val));
+        ASSERT_EQ(0, array_type->cmp((char*)&src_val, (char*)&dst_val));
+    }
+    { // test direct copy
+        bool null_signs[50];
+        uint8_t data[50];
+        Collection dst_val(data, sizeof(null_signs), null_signs);
+        array_type->direct_copy((char*)&dst_val, (char*)&src_val);
+        ASSERT_TRUE(array_type->equal((char*)&src_val, (char*)&dst_val));
+        ASSERT_EQ(0, array_type->cmp((char*)&src_val, (char*)&dst_val));
+    }
+}
+
+
+TEST(ArrayTypeTest, copy_and_equal) {
+    bool bool_array[3] = {true, false, true};
+    bool null_signs[3] = {true, true, true};
+    common_test_array<OLAP_FIELD_TYPE_BOOL>(Collection(bool_array, 3, null_signs));
+
+    uint8_t tiny_int_array[3] = {3, 4, 5};
+    common_test_array<OLAP_FIELD_TYPE_TINYINT>(Collection(tiny_int_array, 3, null_signs));
+
+    int16_t small_int_array[3] = {123, 234, 345};
+    common_test_array<OLAP_FIELD_TYPE_SMALLINT>(Collection(small_int_array, 3, null_signs));
+
+    int32_t int_array[3] = {-123454321, 123454321, 323412343};
+    common_test_array<OLAP_FIELD_TYPE_INT>(Collection(int_array, 3, null_signs));
+
+    uint32_t uint_array[3] = {123454321, 2342341, 52435234};
+    common_test_array<OLAP_FIELD_TYPE_UNSIGNED_INT>(Collection(uint_array, 3, null_signs));
+
+    int64_t bigint_array[3] = {123454321123456789L, 23534543234L, -123454321123456789L};
+    common_test_array<OLAP_FIELD_TYPE_BIGINT>(Collection(bigint_array, 3, null_signs));
+
+    __int128 large_int_array[3] = {1234567899L, 1234567899L, -12345631899L};
+    common_test_array<OLAP_FIELD_TYPE_LARGEINT>(Collection(large_int_array, 3, null_signs));
+
+    float float_array[3] = {1.11, 2.22, -3.33};
+    common_test_array<OLAP_FIELD_TYPE_FLOAT>(Collection(float_array, 3, null_signs));
+
+    double double_array[3] = {12221.11, 12221.11, -12221.11};
+    common_test_array<OLAP_FIELD_TYPE_DOUBLE>(Collection(double_array, 3, null_signs));
+
+    decimal12_t decimal_array[3] = {{123, 234}, {345, 453}, {4524, 2123}};
+    common_test_array<OLAP_FIELD_TYPE_DECIMAL>(Collection(decimal_array, 3, null_signs));
+
+    uint24_t date_array[3] = {(1988 << 9) | (2 << 5) | 1, (1998 << 9) | (2 << 5) | 1, (2008 << 9) | (2 << 5) | 1};
+    common_test_array<OLAP_FIELD_TYPE_DATE>(Collection(date_array, 3, null_signs));
+
+    int64_t datetime_array[3] = {19880201010203L, 19980201010203L, 20080204010203L};
+    common_test_array<OLAP_FIELD_TYPE_DATETIME>(Collection(datetime_array, 3, null_signs));
+
+
+    Slice char_array[3] = {"12345abcde", "12345abcde", "asdf322"};
+    common_test_array<OLAP_FIELD_TYPE_CHAR>(Collection(char_array, 3, null_signs));
+    common_test_array<OLAP_FIELD_TYPE_VARCHAR>(Collection(char_array, 3, null_signs));
+}
+
 } // namespace doris
 
 int main(int argc, char **argv) {
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 89f7dbb..789e45b 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -257,7 +257,7 @@ message ColumnPB {
     optional string referenced_column = 14; // ColumnMessage.referenced_column?
     optional bool has_bitmap_index = 15 [default=false]; // ColumnMessage.has_bitmap_index
     optional bool visible = 16 [default=true];
-
+    repeated ColumnPB children_columns = 17;
 }
 
 message TabletSchemaPB {
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 584d409..90aefec 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -151,6 +151,11 @@ message ColumnMetaPB {
     repeated ColumnIndexMetaPB indexes = 8;
     // pointer to dictionary page when using DICT_ENCODING
     optional PagePointerPB dict_page = 9;
+
+    repeated ColumnMetaPB children_columns = 10;
+
+    // required by array/struct/map reader to create child reader. 
+    optional uint64 num_rows = 11;
 }
 
 message SegmentFooterPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org