You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by GitBox <gi...@apache.org> on 2018/11/15 19:15:58 UTC

[GitHub] doris-ci closed pull request #311: Transform row-oriented table to columnar-oriented table

doris-ci closed pull request #311: Transform row-oriented table to columnar-oriented table
URL: https://github.com/apache/incubator-doris/pull/311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 53e1153a..78be097b 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -35,7 +35,7 @@
 #include "runtime/vectorized_row_batch.h"
 
 #include "olap/delete_handler.h"
-#include "olap/i_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_cond.h"
 #include "olap/olap_engine.h"
 #include "olap/reader.h"
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 774ecaa0..d6dd7c34 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -24,67 +24,63 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")
 add_library(Olap STATIC
     aggregate_func.cpp
     base_compaction.cpp
+    bit_field_reader.cpp
+    bit_field_writer.cpp
+    bloom_filter.hpp
+    bloom_filter_reader.cpp
+    bloom_filter_writer.cpp
+    byte_buffer.cpp
+    column_data.cpp
+    column_reader.cpp
+    column_writer.cpp
     comparison_predicate.cpp
+    compress.cpp
     cumulative_compaction.cpp
-    delta_writer.cpp
+    data_writer.cpp
     delete_handler.cpp
+    delta_writer.cpp
     field.cpp
     field_info.cpp
     file_helper.cpp
+    file_stream.cpp
     hll.cpp
-    i_data.cpp
     in_list_predicate.cpp
+    in_stream.cpp
     lru_cache.cpp
     memtable.cpp
     merger.cpp
     new_status.cpp
     null_predicate.cpp
     olap_cond.cpp
-    olap_data.cpp
     olap_engine.cpp
     olap_header.cpp
+    olap_header_manager.cpp
     olap_index.cpp
+    olap_meta.cpp
     olap_server.cpp
     olap_snapshot.cpp
-    options.cpp
-    store.cpp
     olap_table.cpp
+    options.cpp
+    out_stream.cpp
     push_handler.cpp
     reader.cpp
     row_block.cpp
     row_cursor.cpp
     rowset.cpp
+    run_length_byte_reader.cpp
+    run_length_byte_writer.cpp
+    run_length_integer_reader.cpp
+    run_length_integer_writer.cpp
     schema_change.cpp
+    segment_reader.cpp
+    segment_writer.cpp
+    serialize.cpp
+    store.cpp
+    stream_index_common.cpp
+    stream_index_reader.cpp
+    stream_index_writer.cpp
+    stream_name.cpp
     types.cpp 
     utils.cpp
     wrapper_field.cpp
-    writer.cpp
-    olap_header_manager.cpp
-    olap_meta.cpp
-    column_file/bit_field_reader.cpp
-    column_file/bit_field_writer.cpp
-    column_file/bloom_filter.hpp
-    column_file/bloom_filter_reader.cpp
-    column_file/bloom_filter_writer.cpp
-    column_file/byte_buffer.cpp
-    column_file/column_data.cpp
-    column_file/column_reader.cpp
-    column_file/column_writer.cpp
-    column_file/compress.cpp
-    column_file/data_writer.cpp
-    column_file/file_stream.cpp
-    column_file/in_stream.cpp
-    column_file/out_stream.cpp
-    column_file/run_length_byte_reader.cpp
-    column_file/run_length_byte_writer.cpp
-    column_file/run_length_integer_reader.cpp
-    column_file/run_length_integer_writer.cpp
-    column_file/segment_reader.cpp
-    column_file/segment_writer.cpp
-    column_file/serialize.cpp
-    column_file/stream_index_common.cpp
-    column_file/stream_index_reader.cpp
-    column_file/stream_index_writer.cpp
-    column_file/stream_name.cpp
 )
-
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 12d78aee..59bbf18b 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -25,7 +25,7 @@
 
 #include "olap/delete_handler.h"
 #include "olap/merger.h"
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_engine.h"
 #include "olap/olap_header.h"
 #include "olap/rowset.h"
@@ -105,7 +105,7 @@ OLAPStatus BaseCompaction::run() {
     OLAP_LOG_TRACE("new_base_version_hash", "%ld", new_base_version_hash);
 
     // 2. 获取生成新base需要的data sources
-    vector<IData*> base_data_sources;
+    vector<ColumnData*> base_data_sources;
     _table->acquire_data_sources_by_versions(_need_merged_versions, &base_data_sources);
     if (base_data_sources.empty()) {
         OLAP_LOG_WARNING("fail to acquire need data sources. [table=%s; version=%d]",
@@ -118,7 +118,7 @@ OLAPStatus BaseCompaction::run() {
     {
         DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size());
         int64_t merge_bytes = 0;
-        for (IData* i_data : base_data_sources) {
+        for (ColumnData* i_data : base_data_sources) {
             merge_bytes += i_data->olap_index()->data_size();
         }
         DorisMetrics::base_compaction_bytes_total.increment(merge_bytes);
@@ -133,7 +133,7 @@ OLAPStatus BaseCompaction::run() {
     res = _do_base_compaction(new_base_version_hash,
                              &base_data_sources,
                              &row_count);
-    // 释放不再使用的IData对象
+    // 释放不再使用的ColumnData对象
     _table->release_data_sources(&base_data_sources);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to do base version. [table=%s; version=%d]",
@@ -320,7 +320,7 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
 }
 
 OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash,
-                                               vector<IData*>* base_data_sources,
+                                               vector<ColumnData*>* base_data_sources,
                                                uint64_t* row_count) {
     // 1. 生成新base文件对应的olap index
     Rowset* new_base = new (std::nothrow) Rowset(_table.get(),
@@ -397,7 +397,7 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
 
     // Check row num changes
     uint64_t source_rows = 0;
-    for (IData* i_data : *base_data_sources) {
+    for (ColumnData* i_data : *base_data_sources) {
         source_rows += i_data->olap_index()->num_rows();
     }
     bool row_nums_check = config::row_nums_check;
@@ -530,7 +530,7 @@ OLAPStatus BaseCompaction::_validate_delete_file_action() {
     ReadLock rdlock(_table->get_header_lock_ptr());
     const PDelta* lastest_version = _table->lastest_version();
     Version test_version = Version(0, lastest_version->end_version());
-    vector<IData*> test_sources;
+    vector<ColumnData*> test_sources;
     _table->acquire_data_sources(test_version, &test_sources);
 
     if (test_sources.size() == 0) {
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index a92b13dc..72bf4052 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -27,7 +27,7 @@
 
 namespace doris {
 
-class IData;
+class ColumnData;
 class Rowset;
 
 // @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果
@@ -82,14 +82,14 @@ class BaseCompaction {
     // 
     // 输入参数:
     // - new_base_version_hash: 新Base的VersionHash
-    // - base_data_sources: 生成新Base需要的IData*
+    // - base_data_sources: 生成新Base需要的ColumnData*
     // - row_count: 生成Base过程中产生的row_count
     //
     // 返回值:
     // - 如果执行成功,则返回OLAP_SUCCESS;
     // - 其它情况下,返回相应的错误码
     OLAPStatus _do_base_compaction(VersionHash new_base_version_hash,
-                                  std::vector<IData*>* base_data_sources,
+                                  std::vector<ColumnData*>* base_data_sources,
                                   uint64_t* row_count);
    
     // 更新Header使得修改对外可见
diff --git a/be/src/olap/column_file/bit_field_reader.cpp b/be/src/olap/bit_field_reader.cpp
similarity index 93%
rename from be/src/olap/column_file/bit_field_reader.cpp
rename to be/src/olap/bit_field_reader.cpp
index 35792e08..0252ec70 100644
--- a/be/src/olap/column_file/bit_field_reader.cpp
+++ b/be/src/olap/bit_field_reader.cpp
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/bit_field_reader.h"
+#include "olap/bit_field_reader.h"
 
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/run_length_byte_reader.h"
+#include "olap/column_reader.h"
+#include "olap/in_stream.h"
+#include "olap/run_length_byte_reader.h"
 
 namespace doris {
-namespace column_file {
 
 BitFieldReader::BitFieldReader(ReadOnlyFileStream* input) : 
         _input(input),
@@ -128,6 +127,5 @@ OLAPStatus BitFieldReader::skip(uint64_t num_values) {
     return OLAP_SUCCESS;
 }
 
-}  // namespace column_file
 }  // namespace doris
 
diff --git a/be/src/olap/column_file/bit_field_reader.h b/be/src/olap/bit_field_reader.h
similarity index 94%
rename from be/src/olap/column_file/bit_field_reader.h
rename to be/src/olap/bit_field_reader.h
index 1250e8eb..abc8a47e 100644
--- a/be/src/olap/column_file/bit_field_reader.h
+++ b/be/src/olap/bit_field_reader.h
@@ -18,11 +18,10 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_READER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_READER_H
 
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/stream_index_reader.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class ReadOnlyFileStream;
 class RunLengthByteReader;
@@ -49,7 +48,6 @@ class BitFieldReader {
     DISALLOW_COPY_AND_ASSIGN(BitFieldReader);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_READER_H
diff --git a/be/src/olap/column_file/bit_field_writer.cpp b/be/src/olap/bit_field_writer.cpp
similarity index 96%
rename from be/src/olap/column_file/bit_field_writer.cpp
rename to be/src/olap/bit_field_writer.cpp
index d77484e3..00e11774 100644
--- a/be/src/olap/column_file/bit_field_writer.cpp
+++ b/be/src/olap/bit_field_writer.cpp
@@ -17,10 +17,9 @@
 
 #include "bit_field_writer.h"
 #include <gen_cpp/column_data_file.pb.h>
-#include "olap/column_file/run_length_byte_writer.h"
+#include "olap/run_length_byte_writer.h"
 
 namespace doris {
-namespace column_file {
 
 BitFieldWriter::BitFieldWriter(OutStream* output) : 
         _output(output),
@@ -104,5 +103,4 @@ void BitFieldWriter::get_position(PositionEntryWriter* index_entry) const {
     //                "recorded position count: %d", index_entry->positions_size());
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/bit_field_writer.h b/be/src/olap/bit_field_writer.h
similarity index 94%
rename from be/src/olap/column_file/bit_field_writer.h
rename to be/src/olap/bit_field_writer.h
index 0342bc40..02a6e95a 100644
--- a/be/src/olap/column_file/bit_field_writer.h
+++ b/be/src/olap/bit_field_writer.h
@@ -18,11 +18,10 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_WRITER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_WRITER_H
 
-#include "olap/column_file/stream_index_writer.h"
+#include "olap/stream_index_writer.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class OutStream;
 class RunLengthByteWriter;
@@ -47,7 +46,6 @@ class BitFieldWriter {
     DISALLOW_COPY_AND_ASSIGN(BitFieldWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BIT_FIELD_WRITER_H
diff --git a/be/src/olap/column_file/bloom_filter.hpp b/be/src/olap/bloom_filter.hpp
similarity index 99%
rename from be/src/olap/column_file/bloom_filter.hpp
rename to be/src/olap/bloom_filter.hpp
index 6777d77f..a3a04442 100644
--- a/be/src/olap/column_file/bloom_filter.hpp
+++ b/be/src/olap/bloom_filter.hpp
@@ -28,7 +28,6 @@
 #include "util/hash_util.hpp"
 
 namespace doris {
-namespace column_file {
 
 static const uint64_t DEFAULT_SEED = 104729;
 static const uint64_t BLOOM_FILTER_NULL_HASHCODE = 2862933555777941757ULL;
@@ -297,7 +296,6 @@ class BloomFilter {
     uint32_t _hash_function_num;
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BLOOM_FILTER_HPP
diff --git a/be/src/olap/column_file/bloom_filter_reader.cpp b/be/src/olap/bloom_filter_reader.cpp
similarity index 95%
rename from be/src/olap/column_file/bloom_filter_reader.cpp
rename to be/src/olap/bloom_filter_reader.cpp
index 63177636..dc6ce1de 100644
--- a/be/src/olap/column_file/bloom_filter_reader.cpp
+++ b/be/src/olap/bloom_filter_reader.cpp
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/bloom_filter_reader.h"
+#include "olap/bloom_filter_reader.h"
 
 namespace doris {
-namespace column_file {
 
 BloomFilterIndexReader::~BloomFilterIndexReader() {
     _entry.reset();
@@ -65,5 +64,4 @@ size_t BloomFilterIndexReader::entry_count() {
     return _entry_count;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/bloom_filter_reader.h b/be/src/olap/bloom_filter_reader.h
similarity index 94%
rename from be/src/olap/column_file/bloom_filter_reader.h
rename to be/src/olap/bloom_filter_reader.h
index a2c6a541..10d581a4 100644
--- a/be/src/olap/column_file/bloom_filter_reader.h
+++ b/be/src/olap/bloom_filter_reader.h
@@ -20,11 +20,10 @@
 
 #include <vector>
 
-#include "olap/column_file/bloom_filter.hpp"
-#include "olap/column_file/bloom_filter_writer.h"
+#include "olap/bloom_filter.hpp"
+#include "olap/bloom_filter_writer.h"
 
 namespace doris {
-namespace column_file {
 
 // Each bloom filter index contains mutiple bloom filter entries,
 //     each of which is related to a data block.
@@ -70,6 +69,5 @@ class BloomFilterIndexReader {
     BloomFilter _entry;
 };
 
-} // namespace column_file
 } // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BLOOM_FILTER_READER_H
diff --git a/be/src/olap/column_file/bloom_filter_writer.cpp b/be/src/olap/bloom_filter_writer.cpp
similarity index 97%
rename from be/src/olap/column_file/bloom_filter_writer.cpp
rename to be/src/olap/bloom_filter_writer.cpp
index e226aff4..884fc896 100644
--- a/be/src/olap/column_file/bloom_filter_writer.cpp
+++ b/be/src/olap/bloom_filter_writer.cpp
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/bloom_filter_writer.h"
+#include "olap/bloom_filter_writer.h"
 
 #include <vector>
 
 namespace doris {
-namespace column_file {
 
 BloomFilterIndexWriter::~BloomFilterIndexWriter() {
     for (std::vector<BloomFilter*>::iterator it = _bloom_filters.begin();
@@ -107,5 +106,4 @@ OLAPStatus BloomFilterIndexWriter::write_to_buffer(char* buffer, size_t buffer_s
     return res;
 }
 
-} // namespace column_file
 } // namespace doris
diff --git a/be/src/olap/column_file/bloom_filter_writer.h b/be/src/olap/bloom_filter_writer.h
similarity index 91%
rename from be/src/olap/column_file/bloom_filter_writer.h
rename to be/src/olap/bloom_filter_writer.h
index b2189a5d..c66ff31c 100644
--- a/be/src/olap/column_file/bloom_filter_writer.h
+++ b/be/src/olap/bloom_filter_writer.h
@@ -20,11 +20,10 @@
 
 #include <vector>
 
-#include "olap/column_file/bloom_filter.hpp"
-#include "olap/column_file/out_stream.h"
+#include "olap/bloom_filter.hpp"
+#include "olap/out_stream.h"
 
 namespace doris {
-namespace column_file {
 
 class BloomFilterIndexWriter {
 public:
@@ -41,6 +40,5 @@ class BloomFilterIndexWriter {
     BloomFilterIndexHeader _header;
 };
 
-} // namespace column_file
 } // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BLOOM_FILTER_WRITER_H
diff --git a/be/src/olap/column_file/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp
similarity index 99%
rename from be/src/olap/column_file/byte_buffer.cpp
rename to be/src/olap/byte_buffer.cpp
index 7f9671e5..6dbc4867 100644
--- a/be/src/olap/column_file/byte_buffer.cpp
+++ b/be/src/olap/byte_buffer.cpp
@@ -20,7 +20,6 @@
 #include "olap/utils.h"
 
 namespace doris {
-namespace column_file {
 
 ByteBuffer::ByteBuffer() : 
         _array(NULL),
@@ -194,5 +193,4 @@ OLAPStatus ByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
     return OLAP_SUCCESS;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/byte_buffer.h b/be/src/olap/byte_buffer.h
similarity index 99%
rename from be/src/olap/column_file/byte_buffer.h
rename to be/src/olap/byte_buffer.h
index 24166051..85a4d812 100644
--- a/be/src/olap/column_file/byte_buffer.h
+++ b/be/src/olap/byte_buffer.h
@@ -25,7 +25,6 @@
 #include "util/mem_util.hpp"
 
 namespace doris {
-namespace column_file {
 
 // ByteBuffer是用于数据缓存的一个类
 // ByteBuffer内部维护一个char数组用于缓存数据;
@@ -226,7 +225,6 @@ class ByteBuffer {
     bool _is_mmap;
 };
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_BYTE_BUFFER_H
 
diff --git a/be/src/olap/column_file/column_data.cpp b/be/src/olap/column_data.cpp
similarity index 89%
rename from be/src/olap/column_file/column_data.cpp
rename to be/src/olap/column_data.cpp
index a9f2a2cf..09d133ed 100644
--- a/be/src/olap/column_file/column_data.cpp
+++ b/be/src/olap/column_data.cpp
@@ -17,16 +17,37 @@
 
 #include "column_data.h"
 
-#include "olap/column_file/segment_reader.h"
+#include "olap/segment_reader.h"
 #include "olap/olap_cond.h"
 #include "olap/olap_table.h"
 #include "olap/row_block.h"
 
 namespace doris {
-namespace column_file {
 
-ColumnData::ColumnData(Rowset* olap_index) :
-        IData(COLUMN_ORIENTED_FILE, olap_index),
+ColumnData* ColumnData::create(Rowset* index) {
+    ColumnData* data = NULL;
+    DataFileType file_type = index->table()->data_file_type();
+
+    switch (file_type) {
+    case COLUMN_ORIENTED_FILE:
+        data = new(std::nothrow) ColumnData(index);
+        break;
+
+    default:
+        LOG(WARNING) << "unknown data file type. type=" << DataFileType_Name(file_type).c_str();
+    }
+
+    return data;
+}
+
+ColumnData::ColumnData(Rowset* olap_index)
+      : _data_file_type(COLUMN_ORIENTED_FILE),
+        _olap_index(olap_index),
+        _eof(false),
+        _conditions(NULL),
+        _col_predicates(NULL),
+        _delete_status(DEL_NOT_SATISFIED),
+        _runtime_state(NULL),
         _is_using_cache(false),
         _segment_reader(NULL) {
     _table = olap_index->table();
@@ -40,7 +61,7 @@ ColumnData::~ColumnData() {
 
 OLAPStatus ColumnData::init() {
     _olap_index->acquire();
-    
+
     auto res = _short_key_cursor.init(_olap_index->short_key_fields());
     if (res != OLAP_SUCCESS) {
         LOG(WARNING) << "key cursor init failed, table:" << _table->id()
@@ -344,7 +365,7 @@ OLAPStatus ColumnData::prepare_block_read(
     if (end_key != nullptr) {
         auto res = _seek_to_row(*end_key, find_end_key, true);
         if (res == OLAP_SUCCESS) {
-            // we find a 
+            // we find a
             _end_segment = _current_segment;
             _end_block = _current_block;
             _end_row_index = _read_block->pos();
@@ -519,6 +540,73 @@ OLAPStatus ColumnData::get_next_row_block(RowBlock** row_block) {
     return OLAP_SUCCESS;
 }
 
+bool ColumnData::delta_pruning_filter() {
+    if (empty() || zero_num_rows()) {
+        return true;
+    }
+
+    if (!_olap_index->has_column_statistics()) {
+        return false;
+    }
+
+    return _conditions->delta_pruning_filter(_olap_index->get_column_statistics());
+}
+
+int ColumnData::delete_pruning_filter() {
+    if (empty() || zero_num_rows()) {
+        // should return DEL_NOT_SATISFIED, because that when creating rollup table,
+        // the delete version file should preserved for filter data.
+        return DEL_NOT_SATISFIED;
+    }
+
+    if (false == _olap_index->has_column_statistics()) {
+        /*
+         * if olap_index has no column statistics, we cannot judge whether the data can be filtered or not
+         */
+        return DEL_PARTIAL_SATISFIED;
+    }
+
+    /*
+     * the relationship between delete condition A and B is A || B.
+     * if any delete condition is satisfied, the data can be filtered.
+     * elseif all delete condition is not satifsified, the data can't be filtered.
+     * else is the partial satisfied.
+    */
+    int ret = DEL_PARTIAL_SATISFIED;
+    bool del_partial_stastified = false;
+    bool del_stastified = false;
+    for (auto& delete_condtion : _delete_handler.get_delete_conditions()) {
+        if (delete_condtion.filter_version <= _olap_index->version().first) {
+            continue;
+        }
+
+        Conditions* del_cond = delete_condtion.del_cond;
+        int del_ret = del_cond->delete_pruning_filter(_olap_index->get_column_statistics());
+        if (DEL_SATISFIED == del_ret) {
+            del_stastified = true;
+            break;
+        } else if (DEL_PARTIAL_SATISFIED == del_ret) {
+            del_partial_stastified = true;
+        } else {
+            continue;
+        }
+    }
+
+    if (true == del_stastified) {
+        ret = DEL_SATISFIED;
+    } else if (true == del_partial_stastified) {
+        ret = DEL_PARTIAL_SATISFIED;
+    } else {
+        ret = DEL_NOT_SATISFIED;
+    }
+
+    return ret;
+}
+
+uint64_t ColumnData::get_filted_rows() {
+    return _stats->rows_del_filtered;
+}
+
 OLAPStatus ColumnData::_schema_change_init() {
     _is_using_cache = false;
 
@@ -629,9 +717,4 @@ OLAPStatus ColumnData::_get_block(bool without_filter) {
     return OLAP_SUCCESS;
 }
 
-uint64_t ColumnData::get_filted_rows() {
-    return _stats->rows_del_filtered;
-}
-
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/column_data.h b/be/src/olap/column_data.h
similarity index 71%
rename from be/src/olap/column_file/column_data.h
rename to be/src/olap/column_data.h
index 2c3e0ade..5865e53a 100644
--- a/be/src/olap/column_file/column_data.h
+++ b/be/src/olap/column_data.h
@@ -18,36 +18,62 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_DATA_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_DATA_H
 
+#include <string>
 #include <string>
 
-#include "olap/i_data.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "olap/column_predicate.h"
+#include "olap/delete_handler.h"
+#include "olap/olap_common.h"
+#include "olap/olap_cond.h"
+#include "olap/rowset.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 
 class OLAPTable;
 
-namespace column_file {
 
 class SegmentReader;
 
 // This class is column data reader. this class will be used in two case.
-class ColumnData : public IData {
+class ColumnData {
 public:
+    static ColumnData* create(Rowset* olap_index);
     explicit ColumnData(Rowset* olap_index);
-    virtual ~ColumnData();
+    ~ColumnData();
+
+    // 为了与之前兼容, 暴露部分index的接口
+    Version version() const {
+        return _olap_index->version();
+    }
+    VersionHash version_hash() const {
+        return _olap_index->version_hash();
+    }
+    bool delete_flag() const {
+        return _olap_index->delete_flag();
+    }
+    uint32_t num_segments() const {
+        return _olap_index->num_segments();
+    }
+
+    // 查询数据文件类型
+    DataFileType data_file_type() {
+        return _data_file_type;
+    }
 
-    virtual OLAPStatus init();
+    OLAPStatus init();
 
     OLAPStatus prepare_block_read(
             const RowCursor* start_key, bool find_start_key,
             const RowCursor* end_key, bool find_end_key,
-            RowBlock** first_block) override;
+            RowBlock** first_block);
 
-    OLAPStatus get_next_block(RowBlock** row_block) override;
+    OLAPStatus get_next_block(RowBlock** row_block);
 
-    virtual void set_read_params(
+    void set_read_params(
             const std::vector<uint32_t>& return_columns,
             const std::set<uint32_t>& load_bf_columns,
             const Conditions& conditions,
@@ -57,16 +83,41 @@ class ColumnData : public IData {
             bool is_using_cache,
             RuntimeState* runtime_state);
 
-    virtual OLAPStatus get_first_row_block(RowBlock** row_block);
-    virtual OLAPStatus get_next_row_block(RowBlock** row_block);
-
-    OLAPStatus pickle() override { return OLAP_SUCCESS; }
-    OLAPStatus unpickle() override { return OLAP_SUCCESS; }
+    OLAPStatus get_first_row_block(RowBlock** row_block);
+    OLAPStatus get_next_row_block(RowBlock** row_block);
 
     // Only used to binary search in full-key find row
     const RowCursor* seek_and_get_current_row(const RowBlockPosition& position);
 
-    virtual uint64_t get_filted_rows();
+
+    void set_stats(OlapReaderStatistics* stats) {
+        _stats = stats;
+    }
+
+    void set_delete_handler(const DeleteHandler& delete_handler) {
+        _delete_handler = delete_handler;
+    }
+
+    void set_delete_status(const DelCondSatisfied delete_status) {
+        _delete_status = delete_status;
+    }
+
+    // 开放接口查询_eof,让外界知道数据读取是否正常终止
+    // 因为这个函数被频繁访问, 从性能考虑, 放在基类而不是虚函数
+    bool eof() { return _eof; }
+    void set_eof(bool eof) { _eof = eof; }
+    bool* eof_ptr() { return &_eof; }
+
+    bool empty() const { return _olap_index->empty(); }
+    bool zero_num_rows() const { return _olap_index->zero_num_rows(); }
+
+    bool delta_pruning_filter();
+    int delete_pruning_filter();
+    uint64_t get_filted_rows();
+
+    Rowset* olap_index() const { return _olap_index; }
+    void set_olap_index(Rowset* olap_index) { _olap_index = olap_index; }
+    int64_t num_rows() const { return _olap_index->num_rows(); }
 
 private:
     DISALLOW_COPY_AND_ASSIGN(ColumnData);
@@ -108,6 +159,18 @@ class ColumnData : public IData {
         return &_cursor;
     }
 private:
+    DataFileType _data_file_type;
+    Rowset* _olap_index;
+    // 当到达文件末尾或者到达end key时设置此标志
+    bool _eof;
+    const Conditions* _conditions;
+    const std::vector<ColumnPredicate*>* _col_predicates;
+    DeleteHandler _delete_handler;
+    DelCondSatisfied _delete_status;
+    RuntimeState* _runtime_state;
+    OlapReaderStatistics _owned_stats;
+    OlapReaderStatistics* _stats = &_owned_stats;
+
     OLAPTable* _table;
     // whether in normal read, use return columns to load block
     bool _is_normal_read = false;
@@ -191,7 +254,6 @@ class ColumnDataComparator {
     const Rowset* _index;
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_DATA_H
diff --git a/be/src/olap/column_file/column_reader.cpp b/be/src/olap/column_reader.cpp
similarity index 99%
rename from be/src/olap/column_file/column_reader.cpp
rename to be/src/olap/column_reader.cpp
index 8b091dcd..ff5c37d3 100644
--- a/be/src/olap/column_file/column_reader.cpp
+++ b/be/src/olap/column_reader.cpp
@@ -17,13 +17,12 @@
 
 #include <cstring>
 
-#include "olap/column_file/bit_field_reader.h"
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/file_stream.h"
+#include "olap/bit_field_reader.h"
+#include "olap/column_reader.h"
+#include "olap/file_stream.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 IntegerColumnReader::IntegerColumnReader(uint32_t column_unique_id): 
         _eof(false),
         _column_unique_id(column_unique_id),
@@ -726,7 +725,7 @@ ColumnReader* ColumnReader::create(uint32_t column_id,
                     column_id, column_unique_id, field_info.length, dictionary_size);
         } else {
             OLAP_LOG_WARNING("known encoding format. data may be generated by higher version,"
-                    "try updating olapengine binary to solve this problem");
+                    "try updating olap/ngine binary to solve this problem");
             // TODO. define a new return code
             return NULL;
         }
@@ -765,7 +764,7 @@ ColumnReader* ColumnReader::create(uint32_t column_id,
                     column_id, column_unique_id, field_info.length, dictionary_size);
         } else {
             OLAP_LOG_WARNING("known encoding format. data may be generated by higher version, "
-                    "try updating olapengine binary to solve this problem");
+                    "try updating olap/ngine binary to solve this problem");
             // TODO. define a new return code
             return NULL;
         }
@@ -1355,5 +1354,4 @@ OLAPStatus LargeIntColumnReader::next_vector(
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/column_reader.h b/be/src/olap/column_reader.h
similarity index 99%
rename from be/src/olap/column_file/column_reader.h
rename to be/src/olap/column_reader.h
index 80ec6234..e84de2af 100644
--- a/be/src/olap/column_file/column_reader.h
+++ b/be/src/olap/column_reader.h
@@ -18,11 +18,11 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_READER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_READER_H
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/run_length_byte_reader.h"
-#include "olap/column_file/run_length_integer_reader.h"
-#include "olap/column_file/stream_name.h"
+#include "olap/byte_buffer.h"
+#include "olap/file_stream.h"
+#include "olap/run_length_byte_reader.h"
+#include "olap/run_length_integer_reader.h"
+#include "olap/stream_name.h"
 #include "olap/field.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -31,7 +31,6 @@
 #include "util/date_func.h"
 
 namespace doris {
-namespace column_file {
 
 class StreamName;
 class ReadOnlyFileStream;
@@ -920,7 +919,6 @@ typedef IntegerColumnReaderWrapper<uint24_t, false> DateColumnReader;
 // 内部使用LONG实现
 typedef IntegerColumnReaderWrapper<uint64_t, false> DateTimeColumnReader;
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_READER_H
diff --git a/be/src/olap/column_file/column_writer.cpp b/be/src/olap/column_writer.cpp
similarity index 99%
rename from be/src/olap/column_file/column_writer.cpp
rename to be/src/olap/column_writer.cpp
index c636a618..a95f5390 100755
--- a/be/src/olap/column_file/column_writer.cpp
+++ b/be/src/olap/column_writer.cpp
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/column_writer.h"
+#include "olap/column_writer.h"
 
-#include "olap/column_file/bit_field_writer.h"
+#include "olap/bit_field_writer.h"
 #include "olap/file_helper.h"
 
 namespace doris {
-namespace column_file {
 
 ColumnWriter* ColumnWriter::create(uint32_t column_id,
         const std::vector<FieldInfo>& columns,
@@ -963,5 +962,4 @@ void LargeIntColumnWriter::record_position() {
     _low_writer->get_position(index_entry(), false);
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/column_writer.h b/be/src/olap/column_writer.h
similarity index 98%
rename from be/src/olap/column_file/column_writer.h
rename to be/src/olap/column_writer.h
index 111853d5..82b36412 100644
--- a/be/src/olap/column_file/column_writer.h
+++ b/be/src/olap/column_writer.h
@@ -22,12 +22,12 @@
 
 #include <map>
 
-#include "olap/column_file/bloom_filter.hpp"
-#include "olap/column_file/bloom_filter_writer.h"
-#include "olap/column_file/out_stream.h"
-#include "olap/column_file/stream_index_writer.h"
-#include "olap/column_file/run_length_byte_writer.h"
-#include "olap/column_file/run_length_integer_writer.h"
+#include "olap/bloom_filter.hpp"
+#include "olap/bloom_filter_writer.h"
+#include "olap/out_stream.h"
+#include "olap/stream_index_writer.h"
+#include "olap/run_length_byte_writer.h"
+#include "olap/run_length_integer_writer.h"
 #include "olap/field.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -35,7 +35,6 @@
 #include "olap/row_cursor.h"
 
 namespace doris {
-namespace column_file {
 
 class OutStream;
 class OutStreamFactory;
@@ -648,6 +647,5 @@ class LargeIntColumnWriter : public ColumnWriter {
     DISALLOW_COPY_AND_ASSIGN(LargeIntColumnWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COLUMN_WRITER_H
diff --git a/be/src/olap/column_file/compress.cpp b/be/src/olap/compress.cpp
similarity index 96%
rename from be/src/olap/column_file/compress.cpp
rename to be/src/olap/compress.cpp
index e3890176..11700c47 100644
--- a/be/src/olap/column_file/compress.cpp
+++ b/be/src/olap/compress.cpp
@@ -17,11 +17,10 @@
 
 #include "compress.h"
 
-#include "olap/column_file/byte_buffer.h"
+#include "olap/byte_buffer.h"
 #include "olap/utils.h"
 
 namespace doris {
-namespace column_file {
 
 OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) {
     size_t out_length = 0;
@@ -99,5 +98,4 @@ OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/compress.h b/be/src/olap/compress.h
similarity index 97%
rename from be/src/olap/column_file/compress.h
rename to be/src/olap/compress.h
index 73370618..cf772006 100644
--- a/be/src/olap/column_file/compress.h
+++ b/be/src/olap/compress.h
@@ -21,7 +21,6 @@
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class ByteBuffer;
 
@@ -50,6 +49,5 @@ OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out);
 OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller);
 OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out);
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COMPRESS_H
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index b12d6c85..aee0239f 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -127,7 +127,7 @@ OLAPStatus CumulativeCompaction::run() {
     {
         DorisMetrics::cumulative_compaction_deltas_total.increment(_need_merged_versions.size());
         int64_t merge_bytes = 0;
-        for (IData* i_data : _data_source) {
+        for (ColumnData* i_data : _data_source) {
             merge_bytes += i_data->olap_index()->data_size();
         }
         DorisMetrics::cumulative_compaction_bytes_total.increment(merge_bytes);
@@ -396,7 +396,7 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
 
     // Check row num changes
     uint64_t source_rows = 0;
-    for (IData* i_data : _data_source) {
+    for (ColumnData* i_data : _data_source) {
         source_rows += i_data->olap_index()->num_rows();
     }
     bool row_nums_check = config::row_nums_check;
@@ -513,7 +513,7 @@ bool CumulativeCompaction::_validate_need_merged_versions() {
 OLAPStatus CumulativeCompaction::_validate_delete_file_action() {
     // 1. acquire the new cumulative version to make sure that all is right after deleting files
     Version test_version = Version(0, _cumulative_version.second);
-    vector<IData*> test_sources;
+    vector<ColumnData*> test_sources;
     _table->acquire_data_sources(test_version, &test_sources);
     if (test_sources.size() == 0) {
         OLAP_LOG_WARNING("acquire data source failed. [test_verison=%d-%d]",
diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h
index ae3b3ac5..fe2bbd8f 100755
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -24,7 +24,7 @@
 #include <vector>
 
 #include "olap/merger.h"
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_define.h"
 #include "olap/olap_table.h"
 
@@ -172,7 +172,7 @@ class CumulativeCompaction {
     // 新cumulative文件对应的olap index
     Rowset* _new_cumulative_index;
     // 可合并的delta文件的data文件
-    std::vector<IData*> _data_source;
+    std::vector<ColumnData*> _data_source;
     // 可合并的delta文件的版本
     std::vector<Version> _need_merged_versions;
 
diff --git a/be/src/olap/column_file/data_writer.cpp b/be/src/olap/data_writer.cpp
similarity index 77%
rename from be/src/olap/column_file/data_writer.cpp
rename to be/src/olap/data_writer.cpp
index 4eeafe5c..a2a5ebca 100644
--- a/be/src/olap/column_file/data_writer.cpp
+++ b/be/src/olap/data_writer.cpp
@@ -15,20 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/data_writer.h"
+#include "olap/data_writer.h"
 
 #include <math.h>
 
-#include "olap/column_file/segment_writer.h"
+#include "olap/segment_writer.h"
 #include "olap/rowset.h"
 #include "olap/row_block.h"
 
 
 namespace doris {
-namespace column_file {
+
+ColumnDataWriter* ColumnDataWriter::create(OLAPTablePtr table, Rowset *index, bool is_push_write) {
+    ColumnDataWriter* writer = NULL;
+    switch (table->data_file_type()) {
+    case COLUMN_ORIENTED_FILE:
+        writer = new (std::nothrow) ColumnDataWriter(table, index, is_push_write);
+        break;
+    default:
+        LOG(WARNING) << "unknown data file type. type=" << DataFileType_Name(table->data_file_type());
+        break;
+    }
+
+    return writer;
+}
 
 ColumnDataWriter::ColumnDataWriter(OLAPTablePtr table, Rowset* index, bool is_push_write)
-    : IWriter(is_push_write, table),
+    : _is_push_write(is_push_write),
+      _table(table),
+      _column_statistics(_table->num_key_fields(),
+                         std::pair<WrapperField*, WrapperField*>(NULL, NULL)),
+      _row_index(0),
       _index(index),
       _row_block(NULL),
       _segment_writer(NULL),
@@ -43,6 +60,10 @@ ColumnDataWriter::ColumnDataWriter(OLAPTablePtr table, Rowset* index, bool is_pu
 }
 
 ColumnDataWriter::~ColumnDataWriter() {
+    for (size_t i = 0; i < _column_statistics.size(); ++i) {
+        SAFE_DELETE(_column_statistics[i].first);
+        SAFE_DELETE(_column_statistics[i].second);
+    }
     SAFE_DELETE(_row_block);
     SAFE_DELETE(_segment_writer);
 }
@@ -50,10 +71,15 @@ ColumnDataWriter::~ColumnDataWriter() {
 OLAPStatus ColumnDataWriter::init() {
     OLAPStatus res = OLAP_SUCCESS;
 
-    res = IWriter::init();
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to init res. [res=%d]", res);
-        return res;
+    for (size_t i = 0; i < _column_statistics.size(); ++i) {
+        _column_statistics[i].first = WrapperField::create(_table->tablet_schema()[i]);
+        DCHECK(_column_statistics[i].first != nullptr) << "fail to create column statistics field.";
+        _column_statistics[i].first->set_to_max();
+
+        _column_statistics[i].second = WrapperField::create(_table->tablet_schema()[i]);
+        DCHECK(_column_statistics[i].second != nullptr) << "fail to create column statistics field.";
+        _column_statistics[i].second->set_null();
+        _column_statistics[i].second->set_to_min();
     }
 
     double size = static_cast<double>(_table->segment_size());
@@ -128,6 +154,37 @@ OLAPStatus ColumnDataWriter::write(const char* row) {
     return OLAP_SUCCESS;
 }
 
+
+void ColumnDataWriter::next(const RowCursor& row_cursor) {
+    for (size_t i = 0; i < _table->num_key_fields(); ++i) {
+        char* right = row_cursor.get_field_by_index(i)->get_field_ptr(row_cursor.get_buf());
+        if (_column_statistics[i].first->cmp(right) > 0) {
+            _column_statistics[i].first->copy(right);
+        }
+
+        if (_column_statistics[i].second->cmp(right) < 0) {
+            _column_statistics[i].second->copy(right);
+        }
+    }
+
+    ++_row_index;
+}
+
+void ColumnDataWriter::next(const char* row, const Schema* schema) {
+    for (size_t i = 0; i < _table->num_key_fields(); ++i) {
+        char* right = const_cast<char*>(row + schema->get_col_offset(i));
+        if (_column_statistics[i].first->cmp(right) > 0) {
+            _column_statistics[i].first->copy(right);
+        }
+
+        if (_column_statistics[i].second->cmp(right) < 0) {
+            _column_statistics[i].second->copy(right);
+        }
+    }
+
+    ++_row_index;
+}
+
 OLAPStatus ColumnDataWriter::finalize() {
     if (_all_num_rows == 0 && _row_index == 0) {
         _index->set_empty(true);
@@ -270,6 +327,5 @@ MemPool* ColumnDataWriter::mem_pool() {
     return _row_block->mem_pool();
 }
 
-}  // namespace column_file
 }  // namespace doris
 
diff --git a/be/src/olap/column_file/data_writer.h b/be/src/olap/data_writer.h
similarity index 68%
rename from be/src/olap/column_file/data_writer.h
rename to be/src/olap/data_writer.h
index 74081a92..c9d072d4 100644
--- a/be/src/olap/column_file/data_writer.h
+++ b/be/src/olap/data_writer.h
@@ -18,25 +18,30 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_DATA_WRITER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_DATA_WRITER_H
 
+#include "olap/olap_table.h"
 #include "olap/row_block.h"
-#include "olap/writer.h"
+#include "olap/schema.h"
+#include "olap/wrapper_field.h"
 
 namespace doris {
 class RowBlock;
-namespace column_file {
 class SegmentWriter;
 
-// 列文件格式的Writer,接口参考IWriter中的定义
-class ColumnDataWriter : public IWriter {
+class ColumnDataWriter {
 public:
+    // Factory function
+    // 调用者获得新建的对象, 并负责delete释放
+    static ColumnDataWriter* create(OLAPTablePtr table, Rowset* index, bool is_push_write);
     ColumnDataWriter(OLAPTablePtr table, Rowset* index, bool is_push_write);
-    virtual ~ColumnDataWriter();
-    virtual OLAPStatus init();
-    virtual OLAPStatus attached_by(RowCursor* row_cursor);
-    virtual OLAPStatus write(const char* row);
-    virtual OLAPStatus finalize();
-    virtual uint64_t written_bytes();
-    virtual MemPool* mem_pool();
+    ~ColumnDataWriter();
+    OLAPStatus init();
+    OLAPStatus attached_by(RowCursor* row_cursor);
+    OLAPStatus write(const char* row);
+    void next(const RowCursor& row_cursor);
+    void next(const char* row, const Schema* schema);
+    OLAPStatus finalize();
+    uint64_t written_bytes();
+    MemPool* mem_pool();
 private:
     OLAPStatus _add_segment();
     OLAPStatus _flush_segment_with_verfication();
@@ -44,6 +49,12 @@ class ColumnDataWriter : public IWriter {
     OLAPStatus _flush_row_block(bool finalize);
     OLAPStatus _init_segment();
 
+    bool _is_push_write;
+    OLAPTablePtr _table;
+    // first is min, second is max
+    std::vector<std::pair<WrapperField*, WrapperField*>> _column_statistics;
+    uint32_t _row_index;
+
     Rowset* _index;
     RowBlock* _row_block;      // 使用RowBlcok缓存要写入的数据
     RowCursor _cursor;
@@ -54,11 +65,8 @@ class ColumnDataWriter : public IWriter {
     uint32_t _segment;
     int64_t _all_num_rows;
     bool _new_segment_created;
-
-    DISALLOW_COPY_AND_ASSIGN(ColumnDataWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_DATA_WRITER_H
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 5258a7f1..ce13cf5e 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -113,7 +113,7 @@ OLAPStatus DeltaWriter::init() {
     // New Writer to write data into Rowset
     VLOG(3) << "init writer. table=" << _table->full_name() << ", "
             << "block_row_size=" << _table->num_rows_per_row_block();
-    _writer = IWriter::create(_table, _cur_rowset, true);
+    _writer = ColumnDataWriter::create(_table, _cur_rowset, true);
     DCHECK(_writer != nullptr) << "memory error occur when creating writer";
 
     const std::vector<SlotDescriptor*>& slots = _req.tuple_desc->slots();
@@ -153,7 +153,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
         _rowset_vec.push_back(_cur_rowset);
 
         SAFE_DELETE(_writer);
-        _writer = IWriter::create(_table, _cur_rowset, true);
+        _writer = ColumnDataWriter::create(_table, _cur_rowset, true);
         DCHECK(_writer != nullptr) << "memory error occur when creating writer";
 
         SAFE_DELETE(_mem_table);
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 9611dcc1..013df405 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -22,7 +22,7 @@
 #include "olap/olap_engine.h"
 #include "olap/olap_table.h"
 #include "olap/schema_change.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 #include "runtime/descriptors.h"
 #include "runtime/tuple.h"
 #include "gen_cpp/internal_service.pb.h"
@@ -70,7 +70,7 @@ class DeltaWriter {
     std::vector<Rowset*> _rowset_vec;
     std::vector<Rowset*> _new_rowset_vec;
     OLAPTablePtr _new_table;
-    IWriter* _writer;
+    ColumnDataWriter* _writer;
     MemTable* _mem_table;
     Schema* _schema;
     std::vector<FieldInfo>* _field_infos;
diff --git a/be/src/olap/column_file/file_stream.cpp b/be/src/olap/file_stream.cpp
similarity index 97%
rename from be/src/olap/column_file/file_stream.cpp
rename to be/src/olap/file_stream.cpp
index a72b4351..1ab586e9 100755
--- a/be/src/olap/column_file/file_stream.cpp
+++ b/be/src/olap/file_stream.cpp
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/file_stream.h"
+#include "olap/file_stream.h"
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/out_stream.h"
+#include "olap/byte_buffer.h"
+#include "olap/out_stream.h"
 
 namespace doris {
-namespace column_file {
 
 ReadOnlyFileStream::ReadOnlyFileStream(
         FileHandler* handler,
@@ -197,5 +196,4 @@ uint64_t ReadOnlyFileStream::available() {
     return _file_cursor.remain();
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/file_stream.h b/be/src/olap/file_stream.h
similarity index 98%
rename from be/src/olap/column_file/file_stream.h
rename to be/src/olap/file_stream.h
index 7dbfe4a2..7e167d40 100755
--- a/be/src/olap/column_file/file_stream.h
+++ b/be/src/olap/file_stream.h
@@ -25,15 +25,14 @@
 #include <streambuf>
 #include <vector>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/compress.h"
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/byte_buffer.h"
+#include "olap/compress.h"
+#include "olap/stream_index_reader.h"
 #include "olap/file_helper.h"
 #include "olap/olap_common.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
-namespace column_file {
 
 // 定义输入数据流接口
 class ReadOnlyFileStream {
@@ -314,7 +313,6 @@ inline OLAPStatus ReadOnlyFileStream::read_all(char* buffer, uint64_t* buffer_si
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_FILE_STREAM_H
diff --git a/be/src/olap/i_data.cpp b/be/src/olap/i_data.cpp
deleted file mode 100644
index d297f1e2..00000000
--- a/be/src/olap/i_data.cpp
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/i_data.h"
-
-#include "olap/column_file/column_data.h"
-#include "olap/olap_data.h"
-#include "olap/rowset.h"
-
-namespace doris {
-
-IData* IData::create(Rowset* index) {
-    IData* data = NULL;
-    DataFileType file_type = index->table()->data_file_type();
-
-    switch (file_type) {
-    case OLAP_DATA_FILE:
-        data = new(std::nothrow) OLAPData(index);
-        break;
-
-    case COLUMN_ORIENTED_FILE:
-        data = new(std::nothrow) column_file::ColumnData(index);
-        break;
-
-    default:
-        OLAP_LOG_WARNING("unknown data file type. [type=%s]",
-                         DataFileType_Name(file_type).c_str());
-    }
-
-    return data;
-}
-
-bool IData::delta_pruning_filter() {
-    if (empty() || zero_num_rows()) {
-        return true;
-    }
-            
-    if (!_olap_index->has_column_statistics()) {
-        return false;
-    }
-    
-    return _conditions->delta_pruning_filter(_olap_index->get_column_statistics());
-}
-
-int IData::delete_pruning_filter() {
-    if (empty() || zero_num_rows()) {
-        // should return DEL_NOT_SATISFIED, because that when creating rollup table,
-        // the delete version file should preserved for filter data.
-        return DEL_NOT_SATISFIED;
-    }
-
-    if (false == _olap_index->has_column_statistics()) {
-        /*
-         * if olap_index has no column statistics, we cannot judge whether the data can be filtered or not
-         */
-        return DEL_PARTIAL_SATISFIED;
-    }
-
-    /*
-     * the relationship between delete condition A and B is A || B.
-     * if any delete condition is satisfied, the data can be filtered.
-     * elseif all delete condition is not satifsified, the data can't be filtered.
-     * else is the partial satisfied.
-    */
-    int ret = DEL_PARTIAL_SATISFIED;
-    bool del_partial_stastified = false;
-    bool del_stastified = false;
-    for (auto& delete_condtion : _delete_handler.get_delete_conditions()) {
-        if (delete_condtion.filter_version <= _olap_index->version().first) {
-            continue;
-        }
-
-        Conditions* del_cond = delete_condtion.del_cond;
-        int del_ret = del_cond->delete_pruning_filter(_olap_index->get_column_statistics());
-        if (DEL_SATISFIED == del_ret) {
-            del_stastified = true;
-            break;
-        } else if (DEL_PARTIAL_SATISFIED == del_ret) {
-            del_partial_stastified = true;
-        } else {
-            continue;
-        }
-    }
-
-    if (true == del_stastified) {
-        ret = DEL_SATISFIED;
-    } else if (true == del_partial_stastified) {
-        ret = DEL_PARTIAL_SATISFIED;
-    } else {
-        ret = DEL_NOT_SATISFIED;
-    }
-
-    return ret;
-}
-
-}  // namespace doris
diff --git a/be/src/olap/i_data.h b/be/src/olap/i_data.h
deleted file mode 100644
index 00ecf112..00000000
--- a/be/src/olap/i_data.h
+++ /dev/null
@@ -1,203 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_OLAP_I_DATA_H
-#define DORIS_BE_SRC_OLAP_I_DATA_H
-
-#include <string>
-#include <vector>
-
-#include "gen_cpp/olap_file.pb.h"
-#include "olap/delete_handler.h"
-#include "olap/olap_common.h"
-#include "olap/olap_cond.h"
-#include "olap/rowset.h"
-#include "util/runtime_profile.h"
-
-#include "olap/column_predicate.h"
-
-namespace doris {
-
-class OLAPTable;
-class Rowset;
-class RowBlock;
-class RowCursor;
-class Conditions;
-class RuntimeState;
-
-// 抽象数据访问接口
-// 提供对不同数据文件类型的统一访问接口
-class IData {
-public:
-    // 工厂方法, 生成IData对象, 调用者获得新建的对象, 并负责delete释放
-    static IData* create(Rowset* olap_index);
-    virtual ~IData() {}
-
-    // 为了与之前兼容, 暴露部分index的接口
-    Version version() const {
-        return _olap_index->version();
-    }
-    VersionHash version_hash() const {
-        return _olap_index->version_hash();
-    }
-    bool delete_flag() const {
-        return _olap_index->delete_flag();
-    }
-    uint32_t num_segments() const {
-        return _olap_index->num_segments();
-    }
-
-    // 查询数据文件类型
-    DataFileType data_file_type() {
-        return _data_file_type;
-    }
-
-    // 下面这些函数的注释见OLAPData的注释
-    virtual OLAPStatus init() = 0;
-
-    // Prepre to read data from this data, after seek, block is set to the first block
-    // If start_key is nullptr, we start read from start
-    // If there is no data to read in rang (start_key, end_key), block is set to nullptr
-    // and return OLAP_ERR_DATA_EOF
-    virtual OLAPStatus prepare_block_read(
-        const RowCursor* start_key, bool find_start_key,
-        const RowCursor* end_key, bool find_end_key,
-        RowBlock** block) = 0;
-
-    // This is called after prepare_block_read, used to get next next row block if exist,
-    // 'block' is set to next block. If there is no more block, 'block' is set to nullptr
-    // with OLAP_ERR_DATA_EOF returned
-    virtual OLAPStatus get_next_block(RowBlock** row_block) = 0;
-
-    // 下面两个接口用于schema_change.cpp, 我们需要改功能继续做roll up,
-    // 所以继续暴露该接口
-    virtual OLAPStatus get_first_row_block(RowBlock** row_block) = 0;
-    virtual OLAPStatus get_next_row_block(RowBlock** row_block) = 0;
-
-    // 设置读取数据的参数, 这是一个后加入的接口, IData的实现可以根据这个接口提供
-    // 信息做更多的优化. OLAPData不需要这个接口, ColumnData通过这个接口获取更多
-    // 的上层信息以减少不必须要的数据读取.
-    // Input:
-    //   returns_columns - 设置RowCursor需要返回的列
-    //   conditions - 设置查询的过滤条件
-    //   begin_keys - 查询会使用的begin keys
-    //   end_keys - 查询会使用的end keys
-    virtual void set_read_params(
-            const std::vector<uint32_t>& return_columns,
-            const std::set<uint32_t>& load_bf_columns,
-            const Conditions& conditions,
-            const std::vector<ColumnPredicate*>& col_predicates,
-            const std::vector<RowCursor*>& start_keys,
-            const std::vector<RowCursor*>& end_keys,
-            bool is_using_cache,
-            RuntimeState* runtime_state) {
-        _conditions = &conditions;
-        _col_predicates = &col_predicates;
-        _runtime_state = runtime_state;
-    }
-
-    void set_stats(OlapReaderStatistics* stats) {
-        _stats = stats;
-    }
-
-    virtual void set_delete_handler(const DeleteHandler& delete_handler) {
-        _delete_handler = delete_handler;
-    }
-
-    virtual void set_delete_status(const DelCondSatisfied delete_status) {
-        _delete_status = delete_status;
-    }
-
-    // 开放接口查询_eof,让外界知道数据读取是否正常终止
-    // 因为这个函数被频繁访问, 从性能考虑, 放在基类而不是虚函数
-    bool eof() {
-        return _eof;
-    }
-
-    void set_eof(bool eof) {
-        _eof = eof;
-    }
-
-    bool* eof_ptr() {
-        return &_eof;
-    }
-
-    bool empty() const {
-        return _olap_index->empty();
-    }
-
-    bool zero_num_rows() const {
-        return _olap_index->zero_num_rows();
-    }
-
-    bool delta_pruning_filter();
-
-    int delete_pruning_filter();
-
-    virtual uint64_t get_filted_rows() {
-        return 0;
-    }
-
-    Rowset* olap_index() const {
-        return _olap_index;
-    }
-
-    void set_olap_index(Rowset* olap_index) {
-        _olap_index = olap_index;
-    }
-
-    int64_t num_rows() const {
-        return _olap_index->num_rows();
-    }
-
-    // pickle接口
-    virtual OLAPStatus pickle() = 0;
-    virtual OLAPStatus unpickle() = 0;
-
-protected:
-    // 基类必须指定data_file_type, 也必须关联一个Rowset
-    IData(DataFileType data_file_type, Rowset* olap_index):
-        _data_file_type(data_file_type),
-        _olap_index(olap_index),
-        _eof(false),
-        _conditions(NULL),
-        _col_predicates(NULL),
-        _delete_status(DEL_NOT_SATISFIED),
-        _runtime_state(NULL) {
-    }
-
-protected:
-    DataFileType _data_file_type;
-    Rowset* _olap_index;
-    // 当到达文件末尾或者到达end key时设置此标志
-    bool _eof;
-    const Conditions* _conditions;
-    const std::vector<ColumnPredicate*>* _col_predicates;
-    DeleteHandler _delete_handler;
-    DelCondSatisfied _delete_status;
-    RuntimeState* _runtime_state;
-    OlapReaderStatistics _owned_stats;
-    OlapReaderStatistics* _stats = &_owned_stats;
-
-private:
-    DISALLOW_COPY_AND_ASSIGN(IData);
-};
-
-}  // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_I_DATA_H
-
diff --git a/be/src/olap/column_file/in_stream.cpp b/be/src/olap/in_stream.cpp
similarity index 98%
rename from be/src/olap/column_file/in_stream.cpp
rename to be/src/olap/in_stream.cpp
index 5c646692..92ab4a9b 100644
--- a/be/src/olap/column_file/in_stream.cpp
+++ b/be/src/olap/in_stream.cpp
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/in_stream.h"
+#include "olap/in_stream.h"
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/out_stream.h"
+#include "olap/byte_buffer.h"
+#include "olap/out_stream.h"
 
 namespace doris {
-namespace column_file {
 
 InStream::InStream(
         std::vector<ByteBuffer*>* inputs,
@@ -313,5 +312,4 @@ OLAPStatus InStream::skip(uint64_t skip_length) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/in_stream.h b/be/src/olap/in_stream.h
similarity index 97%
rename from be/src/olap/column_file/in_stream.h
rename to be/src/olap/in_stream.h
index a97617ba..f2f742e8 100644
--- a/be/src/olap/column_file/in_stream.h
+++ b/be/src/olap/in_stream.h
@@ -25,13 +25,12 @@
 #include <streambuf>
 #include <vector>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/compress.h"
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/byte_buffer.h"
+#include "olap/compress.h"
+#include "olap/stream_index_reader.h"
 #include "olap/olap_common.h"
 
 namespace doris {
-namespace column_file {
 
 // 提供Column Reader的Seek位置, 由于ColumnReader的seek需要多个position地址
 // PositionProvider 提供了next方法, 将对position的操作封装为stack的形式
@@ -200,7 +199,6 @@ inline OLAPStatus InStream::read(char* buffer, uint64_t* buf_size) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_IN_STREAM_H
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c2128788..0f62e05e 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -18,7 +18,7 @@
 #include "olap/memtable.h"
 
 #include "olap/hll.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 #include "olap/row_cursor.h"
 #include "util/runtime_profile.h"
 #include "util/debug_util.h"
@@ -143,7 +143,7 @@ void MemTable::insert(Tuple* tuple) {
     }
 }
 
-OLAPStatus MemTable::flush(IWriter* writer) {
+OLAPStatus MemTable::flush(ColumnDataWriter* writer) {
     Table::Iterator it(_skip_list);
     for (it.SeekToFirst(); it.Valid(); it.Next()) {
         const char* row = it.key();
@@ -156,7 +156,7 @@ OLAPStatus MemTable::flush(IWriter* writer) {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus MemTable::close(IWriter* writer) {
+OLAPStatus MemTable::close(ColumnDataWriter* writer) {
     return flush(writer);
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 074e242a..4c74557c 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -26,7 +26,7 @@
 
 namespace doris {
 
-class IWriter;
+class ColumnDataWriter;
 class RowCursor;
 
 class MemTable {
@@ -37,8 +37,8 @@ class MemTable {
     ~MemTable();
     size_t memory_usage();
     void insert(Tuple* tuple);
-    OLAPStatus flush(IWriter* writer);
-    OLAPStatus close(IWriter* writer);
+    OLAPStatus flush(ColumnDataWriter* writer);
+    OLAPStatus close(ColumnDataWriter* writer);
 private:
     Schema* _schema;
     std::vector<FieldInfo>* _field_infos;
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index b4bae264..48456ef0 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -20,20 +20,19 @@
 #include <memory>
 #include <vector>
 
-#include "olap/i_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_define.h"
 #include "olap/rowset.h"
 #include "olap/olap_table.h"
 #include "olap/reader.h"
 #include "olap/row_cursor.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 
 using std::list;
 using std::string;
 using std::unique_ptr;
 using std::vector;
 
-
 namespace doris {
 
 Merger::Merger(OLAPTablePtr table, Rowset* index, ReaderType type) : 
@@ -42,7 +41,7 @@ Merger::Merger(OLAPTablePtr table, Rowset* index, ReaderType type) :
         _reader_type(type),
         _row_count(0) {}
 
-OLAPStatus Merger::merge(const vector<IData*>& olap_data_arr,
+OLAPStatus Merger::merge(const vector<ColumnData*>& olap_data_arr,
                          uint64_t* merged_rows, uint64_t* filted_rows) {
     // Create and initiate reader for scanning and multi-merging specified
     // OLAPDatas.
@@ -63,7 +62,7 @@ OLAPStatus Merger::merge(const vector<IData*>& olap_data_arr,
     }
 
     // create and initiate writer for generating new index and data files.
-    unique_ptr<IWriter> writer(IWriter::create(_table, _index, false));
+    unique_ptr<ColumnDataWriter> writer(ColumnDataWriter::create(_table, _index, false));
 
     if (NULL == writer) {
         OLAP_LOG_WARNING("fail to allocate writer.");
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 79261912..4f587aaf 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -24,7 +24,7 @@
 namespace doris {
 
 class Rowset;
-class IData;
+class ColumnData;
 
 class Merger {
 public:
@@ -36,7 +36,7 @@ class Merger {
     // @brief read from multiple OLAPData and Rowset, then write into single OLAPData and Rowset
     // @return  OLAPStatus: OLAP_SUCCESS or FAIL
     // @note it will take long time to finish.
-    OLAPStatus merge(const std::vector<IData*>& olap_data_arr, 
+    OLAPStatus merge(const std::vector<ColumnData*>& olap_data_arr, 
                      uint64_t* merged_rows, uint64_t* filted_rows);
 
     // 获取在做merge过程中累积的行数
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index cf13bb30..f4a5e5ec 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -29,7 +29,7 @@
 #include <unordered_map>
 #include <unordered_set>
 
-#include "gen_cpp/Types_types.h" 
+#include "gen_cpp/Types_types.h"
 #include "olap/olap_define.h"
 
 namespace doris {
diff --git a/be/src/olap/olap_cond.cpp b/be/src/olap/olap_cond.cpp
index cbf36e80..e107b9ef 100644
--- a/be/src/olap/olap_cond.cpp
+++ b/be/src/olap/olap_cond.cpp
@@ -31,7 +31,7 @@ using std::pair;
 using std::string;
 using std::vector;
 
-using doris::column_file::ColumnStatistics;
+using doris::ColumnStatistics;
 
 //此文件主要用于对用户发送的查询条件和删除条件进行处理,逻辑上二者都可以分为三层
 //Condtiion->Condcolumn->Cond
@@ -425,7 +425,7 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
     return ret;
 }
 
-bool Cond::eval(const column_file::BloomFilter& bf) const {
+bool Cond::eval(const BloomFilter& bf) const {
     //通过单列上BloomFilter对block进行过滤。
     switch (op) {
     case OP_EQ: {
@@ -543,7 +543,7 @@ int CondColumn::del_eval(const std::pair<WrapperField*, WrapperField*>& statisti
     return ret;
 }
 
-bool CondColumn::eval(const column_file::BloomFilter& bf) const {
+bool CondColumn::eval(const BloomFilter& bf) const {
     //通过一列上的所有BloomFilter索引信息对block进行过滤
     for (auto& each_cond : _conds) {
         if (!each_cond->eval(bf)) {
diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h
index 7ec84104..8d923e9a 100644
--- a/be/src/olap/olap_cond.h
+++ b/be/src/olap/olap_cond.h
@@ -25,8 +25,8 @@
 #include <vector>
 
 #include "gen_cpp/column_data_file.pb.h"
-#include "olap/column_file/bloom_filter.hpp"
-#include "olap/column_file/stream_index_common.h"
+#include "olap/bloom_filter.hpp"
+#include "olap/stream_index_common.h"
 #include "olap/field.h"
 #include "olap/olap_table.h"
 #include "olap/row_cursor.h"
@@ -76,7 +76,7 @@ struct Cond {
     bool eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
     int del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const;
 
-    bool eval(const column_file::BloomFilter& bf) const;
+    bool eval(const BloomFilter& bf) const;
     
     CondOp op;
     // valid when op is not OP_IN
@@ -106,7 +106,7 @@ class CondColumn {
     bool eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
     int del_eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
 
-    bool eval(const column_file::BloomFilter& bf) const;
+    bool eval(const BloomFilter& bf) const;
 
     inline bool is_key() const {
         return _is_key;
diff --git a/be/src/olap/olap_data.cpp b/be/src/olap/olap_data.cpp
deleted file mode 100644
index 4c62398a..00000000
--- a/be/src/olap/olap_data.cpp
+++ /dev/null
@@ -1,1108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/olap_data.h"
-
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-
-#include <algorithm>
-#include <cstring>
-#include <string>
-#include <vector>
-
-// #include <ul_string.h>
-
-#include "olap/olap_engine.h"
-#include "olap/rowset.h"
-#include "olap/olap_table.h"
-#include "olap/row_block.h"
-#include "olap/row_cursor.h"
-#include "olap/utils.h"
-#include "runtime/runtime_state.h"
-#include "runtime/mem_tracker.h"
-#include "util/mem_util.hpp"
-
-using std::exception;
-using std::lower_bound;
-using std::nothrow;
-using std::string;
-using std::upper_bound;
-using std::vector;
-
-namespace doris {
-
-OLAPData::OLAPData(Rowset* index) :
-        IData(OLAP_DATA_FILE, index),
-        _olap_table(NULL),
-        _is_pickled(true),
-        _session_status(NULL),
-        _row_block_broker(NULL),
-        _write_descriptor(NULL) {
-    _olap_table = index->table();
-}
-
-OLAPData::~OLAPData() {
-    pickle();
-
-    SAFE_DELETE(_session_status);
-
-    // Close file to release resources
-    if (_write_descriptor) {
-        SAFE_DELETE_ARRAY(_write_descriptor->packed_buffer);
-
-        // File would be closed in deconstruction function
-        SAFE_DELETE(_write_descriptor);
-    }
-}
-
-OLAPStatus OLAPData::init() {
-    return unpickle();
-}
-
-OLAPStatus OLAPData::get_first_row_block(RowBlock** row_block,
-                                     const char** packed_row_block,
-                                     uint32_t* packed_row_block_size) {
-    OLAPStatus res = OLAP_SUCCESS;
-    (row_block == NULL || ((*row_block) = NULL));
-
-    set_eof(false);
-    
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled olap data is forbidden.");
-        return OLAP_ERR_NOT_INITED;
-    }
-
-    RowBlockPosition row_block_pos;
-    res = olap_index()->find_first_row_block(&row_block_pos);
-    if (res == OLAP_ERR_INDEX_EOF) {
-        // 为了防止出错,这里还是为rowBlock设下NULL,否则当前一个block被析构,
-        // 下一个版本恰好又是NULL,这个指针就可能变成野指针。
-        (row_block == NULL || ((*row_block) = NULL));
-        set_eof(true);
-        return res;
-    } else if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to find first row block with Rowset.");
-        return res;
-    }
-
-    res = _row_block_broker->change_to(row_block_pos);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to get row block. "
-                         "[segment=%d, block_size=%d, data_offset=%d, index_offset=%d]",
-                         row_block_pos.segment,
-                         row_block_pos.block_size,
-                         row_block_pos.data_offset,
-                         row_block_pos.index_offset);
-        _check_io_error(res);
-        return res;
-    }
-    _stats->raw_rows_read += _row_block_broker->num_rows();
-
-    (row_block == NULL || (*row_block = _row_block_broker->row_block()));
-    (packed_row_block == NULL || (*packed_row_block = _row_block_broker->packed_row_block()));
-    (packed_row_block_size == NULL
-         || (*packed_row_block_size = _row_block_broker->packed_row_block_size()));
-
-    return res;
-}
-
-OLAPStatus OLAPData::get_next_row_block(RowBlock** row_block,
-                                    const char** packed_row_block,
-                                    uint32_t* packed_row_block_size) {
-    OLAPStatus res = OLAP_SUCCESS;
-    set_eof(false);
-
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled olap data is forbidden.");
-        return OLAP_ERR_NOT_INITED;
-    }
-
-    RowBlockPosition row_block_pos = _row_block_broker->position();
-    res = olap_index()->find_next_row_block(&row_block_pos, eof_ptr());
-    if (eof()) {
-        OLAP_LOG_DEBUG("Got EOF from Rowset. [segment=%d, data_offset=%d]",
-                       row_block_pos.segment,
-                       row_block_pos.data_offset);
-        // 当到达eof的时候不需要把结果带出来
-        (row_block == NULL || (*row_block = NULL));
-        (packed_row_block == NULL || (*packed_row_block = NULL));
-        (packed_row_block_size == NULL || (*packed_row_block_size = 0));
-
-        return res;
-    }
-
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to get next row block. "
-                         "[res=%d, segment=%d, data_offset=%d, index_offset=%d]",
-                         res,
-                         row_block_pos.segment,
-                         row_block_pos.data_offset,
-                         row_block_pos.index_offset);
-        return res;
-    }
-
-    // 块位置越界有两种情况
-    // 1 block_pos大于end_block_pos
-    // 2 block_pos等于end_block_pos,但end_row_index为0,即end_key在
-    // 某个block的首行
-    if (_row_block_broker->get_set_end_row_flag()
-            && (row_block_pos > _row_block_broker->end_block_position()
-                    || (_row_block_broker->end_row_index() == 0
-                            && row_block_pos == _row_block_broker->end_block_position()))) {
-        set_eof(true);
-        OLAP_LOG_TRACE("Over the end row block. [segment=%d, data_offset=%d, index_offset=%d]",
-                       row_block_pos.segment,
-                       row_block_pos.data_offset,
-                       row_block_pos.index_offset);
-
-        return OLAP_ERR_INDEX_EOF;
-    }
-    res = _row_block_broker->change_to(row_block_pos);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to get row block. "
-                         "[segment=%d, block_size=%d, data_offset=%d, index_offset=%d]",
-                         row_block_pos.segment,
-                         row_block_pos.block_size,
-                         row_block_pos.data_offset,
-                         row_block_pos.index_offset);
-
-        _check_io_error(res);
-        return res;
-    }
-    _stats->raw_rows_read += _row_block_broker->num_rows();
-
-    (row_block == NULL || (*row_block = _row_block_broker->row_block()));
-    (packed_row_block == NULL || (*packed_row_block = _row_block_broker->packed_row_block()));
-    (packed_row_block_size == NULL
-         || (*packed_row_block_size = _row_block_broker->packed_row_block_size()));
-
-    return res;
-}
-
-OLAPStatus OLAPData::get_next_row_block(RowBlock** row_block) {
-    return get_next_row_block(row_block, NULL, NULL);
-}
-
-OLAPStatus OLAPData::get_first_row_block(RowBlock** row_block) {
-    return get_first_row_block(row_block, NULL, NULL);
-}
-
-RowBlock* OLAPData::seek_and_get_row_block(const RowBlockPosition& position) {
-    set_eof(false);
-
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled OLAPData is forbidden.");
-        return NULL;
-    }
-
-    OLAPStatus res = OLAP_SUCCESS;
-    if ((res = _row_block_broker->change_to(position)) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to get row block. "
-                         "[segment=%d, block_size=%d, data_offset=%d, index_offset=%d]",
-                         position.segment,
-                         position.block_size,
-                         position.data_offset,
-                         position.index_offset);
-
-        _check_io_error(res);
-        return NULL;
-    }
-    _stats->raw_rows_read += _row_block_broker->num_rows();
-
-    return _row_block_broker->row_block();
-}
-
-const RowCursor* OLAPData::get_first_row() {
-    set_eof(false);
-
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled olap data is forbidden.");
-        return NULL;
-    }
-
-    RowBlockPosition row_block_pos;
-    OLAPStatus res = olap_index()->find_first_row_block(&row_block_pos);
-    if (res == OLAP_ERR_INDEX_EOF) {
-        set_eof(true);
-        return NULL;
-    } else if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to find first row block with Rowset.");
-        return NULL;
-    }
-
-    VLOG(3) << "RowBlockPosition='" << row_block_pos.to_string() << "'";
-
-    if ((res = _row_block_broker->change_to(row_block_pos)) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to get row block. "
-                         "[segment=%d, block_size=%d, data_offset=%d, index_offset=%d]",
-                         row_block_pos.segment,
-                         row_block_pos.block_size,
-                         row_block_pos.data_offset,
-                         row_block_pos.index_offset);
-
-        _check_io_error(res);
-        return NULL;
-    }
-    _stats->raw_rows_read += _row_block_broker->num_rows();
-
-    return _row_block_broker->first();
-}
-
-const RowCursor* OLAPData::get_next_row() {
-    set_eof(false);
-
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled olap data is forbidden.");
-        return NULL;
-    }
-
-    bool end_of_row_block = false;
-    const RowCursor* row_cursor = _row_block_broker->next(&end_of_row_block);
-    if (row_cursor) {
-        return row_cursor;
-    }
-
-    if (end_of_row_block) {
-        // get_next_row_block传了三个NULL,不需要其返回结果
-        if (get_next_row_block(NULL, NULL, NULL) != OLAP_SUCCESS && eof() == false) {
-            OLAP_LOG_WARNING("fail to get next row block.");
-            return NULL;
-        } else if (eof() == true){
-            OLAP_LOG_DEBUG("get next row block got eof.");
-            return NULL;
-        }
-
-        if (_row_block_broker != NULL) {
-            return _row_block_broker->first();
-        }
-    }
-
-    return NULL;
-}
-
-const RowCursor* OLAPData::find_row(const RowCursor& key, bool find_last_key, bool is_end_key) {
-    set_eof(false);
-
-    if (!_row_block_broker) {
-        OLAP_LOG_FATAL("using pickled OLAPData is forbidden.");
-        return NULL;
-    }
-
-    OlapStopWatch time_watch;
-
-    OLAPStatus res = OLAP_SUCCESS;
-    RowCursor helper_cursor;
-    if (helper_cursor.init(olap_index()->short_key_fields()) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Init helper_cursor fail.");
-        return NULL;
-    }
-
-    // 取得RowBlock范围的起点和终点,在上面做二分
-    // 若find_last_key==false, index找到最后一个<key的block,
-    // 或者说是第一个可能包含>=key的block, 即二分的左边界
-    // 若find_last_key==true, index找到第一个包含>key的block,
-    // 或者说是最后一个可能包含>=key的block, 即二分的右边界
-    // 二分完成后,first_pos与last_pos指向同一个RowBlock
-    RowBlockPosition start_position;
-    RowBlockPosition end_position;
-    res = olap_index()->find_row_block(key, &helper_cursor, false,  &start_position);
-    if (res == OLAP_ERR_INDEX_EOF) {
-        set_eof(true);
-        return NULL;
-    } else if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Find row block failed. [res=%d]", res);
-        return NULL;
-    }
-
-    res = olap_index()->find_row_block(key, &helper_cursor, true, &end_position);
-    if (res == OLAP_ERR_INDEX_EOF) {
-        set_eof(true);
-        return NULL;
-    } else if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("find row block failed. [res=%d]", res);
-        return NULL;
-    }
-
-    if (key.field_count() > _olap_table->num_short_key_fields()) {
-        // helper rowcursor for OLAPDataComparator
-        RowCursor data_helper_cursor;
-        if ((res = data_helper_cursor.init(_olap_table->tablet_schema())) != OLAP_SUCCESS) {
-            OLAP_LOG_FATAL("fail to init row cursor. [res=%d]", res);
-            return NULL;
-        }
-
-        // 调整start_position
-        uint32_t distance = olap_index()->compute_distance(start_position, end_position);
-        BinarySearchIterator it_start(0u);
-        BinarySearchIterator it_end(distance + 1);
-        BinarySearchIterator it_result(0u);
-        OLAPDataComparator comparator(start_position, 
-                                      this, 
-                                      olap_index(), 
-                                      &data_helper_cursor);
-
-        try {
-            if (!find_last_key) {
-                it_result = lower_bound(it_start, it_end, key, comparator);
-            } else {
-                it_result = upper_bound(it_start, it_end, key, comparator);
-            }
-
-            OLAP_LOG_DEBUG("get result iterator. [offset=%u start_pos='%s']",
-                           *it_result,
-                           start_position.to_string().c_str());
-        } catch (exception& e) {
-            OLAP_LOG_FATAL("exception happens when doing seek. [e.what='%s']", e.what());
-            return NULL;
-        }
-
-        // 如果没有找到就肯定没有, 并且一定是eof了
-        // 如果找到了, 那么找到的block里一定有满足条件的记录
-        if (*it_result == *it_end) {
-            OLAP_LOG_DEBUG("key isnot in current data file.");
-            set_eof(true);
-            return NULL;
-        }
-
-        // 设置_position
-        res = olap_index()->advance_row_block(*it_result, &start_position);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to advance row block. "
-                             "[res=%d start_position='%s' offset=%u]",
-                             res, start_position.to_string().c_str(), *it_result);
-            return NULL;
-        }
-    }
-
-    bool eof = false;
-    bool data_eof = false;
-    const RowCursor* row_cursor = NULL;
-
-    while (end_position >= start_position && !data_eof) {
-        // 根据pos取到对应的row_block
-        if ((res = _row_block_broker->change_to(start_position)) != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("Fail to get row block. "
-                             "[segment=%d, block_size=%d, data_offset=%d, index_offset=%d]",
-                             start_position.segment,
-                             start_position.block_size,
-                             start_position.data_offset,
-                             start_position.index_offset);
-
-            _check_io_error(res);
-            return NULL;
-        }
-        _stats->raw_rows_read += _row_block_broker->num_rows();
-
-        // eof代表这一块找完了,仍然没有发现key,但也可能是找到了endkey,也就是说
-        // 这个数据中没有需要的key。
-        row_cursor = _row_block_broker->find_row(key, find_last_key, &eof);
-        if (row_cursor != NULL || (eof && _row_block_broker->is_end_block())) {
-            break;
-        }
-
-        res = olap_index()->find_next_row_block(&start_position, &data_eof);
-        if (res != OLAP_SUCCESS) {
-            break;
-        }
-    }
-
-    OLAPNoticeInfo::add_seek_count();
-    OLAPNoticeInfo::add_seek_time_us(time_watch.get_elapse_time_us());
-
-    if (row_cursor) {
-        set_eof(false);
-        return row_cursor;
-    } else if (eof || data_eof) {
-        // 此处找不到,是由于设置了end_key,超找超过了end_key对应行
-        VLOG(3) << "key can't be found, Search over end_key![key=" << key.to_string() << "]";
-        set_eof(true);
-        return NULL;
-    } else {
-        // 走到这个分支只能说明前面的某个地方有bug
-        OLAP_LOG_FATAL("find null row but data isnot eof! [res=%d]", res);
-        return NULL;
-    }
-}
-
-OLAPStatus OLAPData::pickle() {
-    if (_is_pickled) {
-        return OLAP_SUCCESS;
-    }
-
-    if (_row_block_broker != NULL) {
-        // 保存现场
-        if (_row_block_broker->row_block() != NULL) {
-            if (_session_status == NULL) {
-                _session_status = new(nothrow) SessionStatus();
-                if (_session_status == NULL) {
-                    OLAP_LOG_FATAL("fail to malloc SessionStatus. [size=%ld]",
-                                   sizeof(SessionStatus));
-                    return OLAP_ERR_MALLOC_ERROR;
-                }
-            }
-
-            _session_status->position = _row_block_broker->position();
-            _session_status->row_index = _row_block_broker->row_index();
-            _session_status->end_block_position = _row_block_broker->end_block_position();
-            _session_status->end_row_index = _row_block_broker->end_row_index();
-            _session_status->is_set_end_row = _row_block_broker->get_set_end_row_flag();
-        }
-
-        SAFE_DELETE(_row_block_broker);
-    }
-
-    _is_pickled = true;
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::unpickle() {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    if (!_is_pickled) {
-        return OLAP_SUCCESS;
-    }
-
-    _row_block_broker = new(nothrow) RowBlockBroker(_olap_table, olap_index(), _runtime_state);
-    if (_row_block_broker == NULL) {
-        OLAP_LOG_FATAL("fail to malloc RowBlockBroker. [size=%ld]", sizeof(RowBlockBroker));
-        return OLAP_ERR_MALLOC_ERROR;
-    }
-
-    res = _row_block_broker->init();
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_FATAL("fail to init RowBlockBroker. [res=%d]", res);
-        SAFE_DELETE(_row_block_broker);
-        return OLAP_ERR_INIT_FAILED;
-    }
-
-    // 恢复现场
-    if (_session_status != NULL) {
-        _row_block_broker->set_end_row(_session_status->end_block_position,
-                                       _session_status->end_row_index);
-        _row_block_broker->set_end_row_flag(_session_status->is_set_end_row);
-
-        res = _row_block_broker->change_to(_session_status->position);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to get row block. "
-                             "[res=%d segment=%d block_size=%d data_offset=%d index_offset=%d]",
-                             res,
-                             _session_status->position.segment,
-                             _session_status->position.block_size,
-                             _session_status->position.data_offset,
-                             _session_status->position.index_offset);
-
-            SAFE_DELETE(_row_block_broker);
-            _check_io_error(res);
-            return OLAP_ERR_DATA_ROW_BLOCK_ERROR;
-        }
-        _stats->raw_rows_read += _row_block_broker->num_rows();
-
-        _row_block_broker->get_row(_session_status->row_index);
-    }
-
-    _is_pickled = false;
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::add_segment() {
-    // 在这里没做是否封口(finalize_segment)的检查,如果没有封口再调用add_segment会有问题
-    OLAPStatus res = OLAP_SUCCESS;
-    string file_name;
-    OLAPDataHeaderMessage* data_header = NULL;
-
-    if (_write_descriptor == NULL) {
-        if ((_write_descriptor = new(nothrow) WriteDescriptor()) == NULL) {
-            OLAP_LOG_FATAL("fail to malloc FileHandler. [size=%ld]", sizeof(FileHandler));
-            res = OLAP_ERR_MALLOC_ERROR;
-            goto ADD_SEGMENT_ERR;
-        }
-
-        _write_descriptor->packed_buffer =
-                new(nothrow) char[OLAP_DEFAULT_MAX_PACKED_ROW_BLOCK_SIZE];
-        if (_write_descriptor->packed_buffer == NULL) {
-            OLAP_LOG_FATAL("fail to malloc write buffer. [size=%ld]",
-                           sizeof(OLAP_DEFAULT_MAX_PACKED_ROW_BLOCK_SIZE));
-            res = OLAP_ERR_MALLOC_ERROR;
-            goto ADD_SEGMENT_ERR;
-        }
-
-        memset(_write_descriptor->packed_buffer, 0, OLAP_DEFAULT_MAX_PACKED_ROW_BLOCK_SIZE);
-        _write_descriptor->segment = 0;
-    } else {
-        ++_write_descriptor->segment;
-    }
-
-    data_header = _write_descriptor->file_header.mutable_message();
-    data_header->set_segment(_write_descriptor->segment);
-
-    // file for new segment
-    file_name = olap_index()->construct_data_file_path(olap_index()->rowset_id(), _write_descriptor->segment);
-    res = _write_descriptor->file_handle.open_with_mode(
-            file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "Fail to open file. [file_name=" << file_name << "]";
-        goto ADD_SEGMENT_ERR;
-    }
-
-    // 准备FileHeader
-    res = _write_descriptor->file_header.prepare(&(_write_descriptor->file_handle));
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_FATAL("write file header error. [res=%d err=%m]", res);
-        goto ADD_SEGMENT_ERR;
-    }
-
-    // 跳过FileHeader
-    if (_write_descriptor->file_handle.seek(_write_descriptor->file_header.size(),
-                                            SEEK_SET) == -1) {
-        OLAP_LOG_FATAL("lseek header file error. [err=%m]");
-        res = OLAP_ERR_IO_ERROR;
-        goto ADD_SEGMENT_ERR;
-    }
-
-    // 初始化checksum
-    _write_descriptor->checksum = CRC32_INIT;
-    return res;
-
-ADD_SEGMENT_ERR:
-    if (_write_descriptor) {
-        if (_write_descriptor->packed_buffer) {
-            SAFE_DELETE_ARRAY(_write_descriptor->packed_buffer);
-        }
-
-        SAFE_DELETE(_write_descriptor);
-        _write_descriptor = NULL;
-    }
-
-    _check_io_error(res);
-
-    return res;
-}
-
-OLAPStatus OLAPData::add_row_block(RowBlock* row_block,
-                                   uint32_t* start_data_offset,
-                                   uint32_t* end_data_offset) {
-    if (!_write_descriptor) {
-        OLAP_LOG_WARNING("segment should be added before.");
-        return OLAP_ERR_NOT_INITED;
-    }
-
-    OLAPStatus res = OLAP_SUCCESS;
-
-    // 返回RowBlock起始位置的Offset
-    off_t offset = _write_descriptor->file_handle.tell();
-    if (offset == -1) {
-        res = OLAP_ERR_IO_ERROR;
-        _check_io_error(res);
-        return res;
-    }
-
-    (start_data_offset == NULL || (*start_data_offset = static_cast<uint32_t>(offset)));
-
-    size_t packed_size = 0;
-    // 使用LZO1C-99压缩RowBlock
-    if (row_block->serialize_to_row_format(_write_descriptor->packed_buffer,
-                                           OLAP_DEFAULT_MAX_PACKED_ROW_BLOCK_SIZE,
-                                           &packed_size,
-                                           OLAP_COMP_STORAGE) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to compress row block.");
-        return OLAP_ERR_COMPRESS_ERROR;
-    }
-
-    // RowBlockInfo is valid only after serialize is called
-    const RowBlockInfo& rb_info = row_block->row_block_info();
-    RowBlockHeaderV2 row_block_header;
-    row_block_header.packed_len = static_cast<uint32_t>(packed_size);
-    row_block_header.num_rows = rb_info.row_num;
-    row_block_header.checksum = rb_info.checksum;
-    row_block_header.magic_num = 0;
-    row_block_header.version = 1;
-    row_block_header.unpacked_len = rb_info.unpacked_len;
-    
-    res = _write_descriptor->file_handle.write(&row_block_header, sizeof(row_block_header));
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to dump row block header. [size=%lu]", sizeof(row_block_header));
-        return res;
-    }
-    
-    res = _write_descriptor->file_handle.write(_write_descriptor->packed_buffer, packed_size);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to dump row block data. [size=%lu]", packed_size);
-        return res;
-    }
-
-    // 更新SegmentHeader的校验码,计算RowBlock数据部分
-    _write_descriptor->checksum = olap_crc32(_write_descriptor->checksum,
-                                             _write_descriptor->packed_buffer,
-                                             packed_size);
-    // 返回RowBlock结束位置的Offset
-    offset = _write_descriptor->file_handle.tell();
-    if (offset == -1) {
-        res = OLAP_ERR_IO_ERROR;
-        _check_io_error(res);
-        return res;
-    }
-
-    (end_data_offset == NULL || (*end_data_offset = static_cast<uint32_t>(offset)));
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::finalize_segment(uint32_t* data_offset) {
-    if (!_write_descriptor) {
-        OLAP_LOG_WARNING("segment should be added before.");
-        return OLAP_ERR_NOT_INITED;
-    }
-
-    OLAPStatus res;
-    off_t file_length = _write_descriptor->file_handle.tell();
-    if (file_length == -1) {
-        res = OLAP_ERR_IO_ERROR;
-        goto FINALIZE_SEGMENT_ERROR;
-    }
-
-    // 返回写入的总大小
-    (data_offset == NULL || (*data_offset = static_cast<uint32_t>(file_length)));
-
-    _write_descriptor->file_header.set_file_length(file_length);
-    _write_descriptor->file_header.set_checksum(_write_descriptor->checksum);
-
-    // 写入更新之后的FileHeader
-    res = _write_descriptor->file_header.serialize(&(_write_descriptor->file_handle));
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_FATAL("write file header error. [err=%m]");
-        goto FINALIZE_SEGMENT_ERROR;
-    }
-
-    OLAP_LOG_DEBUG("finalize_segment. [file_name='%s' file_size=%ld]",
-                   _write_descriptor->file_handle.file_name().c_str(),
-                   file_length);
-    
-    res = _write_descriptor->file_handle.close();
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_FATAL("close file header error. [res=%d err=%m]", res);
-        goto FINALIZE_SEGMENT_ERROR;
-    }
-
-    return OLAP_SUCCESS;
-
-FINALIZE_SEGMENT_ERROR:
-    _check_io_error(res);
-
-    return res;
-}
-
-void OLAPData::sync() {
-    if (_write_descriptor->file_handle.sync() == -1) {
-        OLAP_LOG_WARNING("fail to sync file.[err=%m]");
-        _check_io_error(OLAP_ERR_IO_ERROR);
-    }
-}
-
-OLAPStatus OLAPData::set_end_key(const RowCursor* end_key, bool find_last_end_key) {
-    _row_block_broker->set_end_row_flag(false);
-
-    if (end_key == NULL) {
-        return OLAP_SUCCESS;
-    }
-
-    if (find_row(*end_key, find_last_end_key, true) == NULL) {
-        OLAP_LOG_TRACE("End key can't be found, Search until EOF![end_key='%s']",
-                       end_key->to_string().c_str());
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    OLAP_LOG_TRACE("end key in block %s row %d",
-                   _row_block_broker->position().to_string().c_str(),
-                   _row_block_broker->row_index());
-
-    _row_block_broker->set_end_row(_row_block_broker->position(), _row_block_broker->row_index());
-    _row_block_broker->set_end_row_flag(true);
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::prepare_block_read(
-        const RowCursor* start_key, bool find_start_key,
-        const RowCursor* end_key, bool find_end_key,
-        RowBlock** row_block) {
-    if (end_key != nullptr) {
-        auto res = set_end_key(end_key, find_end_key);
-        if (res != OLAP_SUCCESS) {
-            // Just ignore this error
-            VLOG(1) << "can't find end_key, end_key:" << end_key->to_string();
-        }
-    }
-    if (start_key != nullptr) {
-        auto row = find_row(*start_key, find_start_key, false);
-        if (row == nullptr) {
-            if (!eof()) {
-                // Some error happened
-                LOG(WARNING) << "failed to find start row row";
-                return OLAP_ERR_INIT_FAILED;
-            }
-            *row_block = nullptr;
-            return OLAP_ERR_DATA_EOF;
-        }
-    } else {
-        auto row = get_first_row();
-        if (row == nullptr) {
-            if (!eof()) {
-                LOG(WARNING) << "failed to get first row";
-                return OLAP_ERR_INIT_FAILED;
-            }
-            *row_block = nullptr;
-            return OLAP_ERR_DATA_EOF;
-        }
-    }
-    *row_block = _row_block_broker->get_row_block_to_read();
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::get_next_block(RowBlock** block) {
-    auto res = get_next_row_block(block, nullptr, nullptr);
-    if (eof()) {
-        *block = nullptr;
-        return OLAP_ERR_DATA_EOF;
-    }
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "failed to get_next_row_block, res:" << res;
-        return res;
-    }
-    *block = _row_block_broker->get_row_block_to_read();
-    return OLAP_SUCCESS;
-}
-
-void OLAPData::_check_io_error(OLAPStatus res) {
-    if (is_io_error(res)) {
-        _olap_table->set_io_error();
-    }
-}
-
-OLAPData::RowBlockBroker::RowBlockBroker(
-        OLAPTable* olap_table, Rowset* olap_index, RuntimeState* runtime_state) :
-        _file_handler(),
-        _row_block_pos(),
-        _read_buffer(NULL),
-        _packed_row_block_size(0),
-        _row_block_header_size(0),
-        _unpacked_len(0),
-        _row_block(NULL),
-        _num_rows(0),
-        _row_index(0),
-        _end_block_position(),
-        _end_row_index(0),
-        _is_set_end_row(false),
-        _olap_table(olap_table),
-        _olap_index(olap_index),
-        _is_end_block(false),
-        _runtime_state(runtime_state) {
-    if (_olap_index != NULL) {
-        _olap_index->acquire();
-    }
-
-    _row_block_header_size = sizeof(RowBlockHeaderV2);
-    _data_read_buf_size = OLAP_DEFAULT_DATA_READ_BUF_SIZE;
-}
-
-OLAPData::RowBlockBroker::~RowBlockBroker() {
-    if (_olap_index != NULL) {
-        _olap_index->release();
-    }
-
-    this->release();
-
-    if (_runtime_state != NULL && _read_buffer != NULL) {
-        MemTracker::update_limits(-1 * _data_read_buf_size, _runtime_state->mem_trackers());
-    }
-
-    SAFE_DELETE_ARRAY(_read_buffer);
-}
-
-OLAPStatus OLAPData::RowBlockBroker::init() {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    if ((res = _row_cursor.init(_olap_table->tablet_schema())) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
-        return OLAP_ERR_INIT_FAILED;
-    }
-
-    _read_buffer = new(nothrow) char[_data_read_buf_size];
-    if (_read_buffer == NULL) {
-        OLAP_LOG_FATAL("fail to malloc _packed_row_block. [size=%lu]", _data_read_buf_size);
-        return OLAP_ERR_MALLOC_ERROR;
-    }
-
-    if (_runtime_state != NULL) {
-        MemTracker::update_limits(_data_read_buf_size, _runtime_state->mem_trackers());
-        if (MemTracker::limit_exceeded(*_runtime_state->mem_trackers())) {
-            return OLAP_ERR_FETCH_MEMORY_EXCEEDED;
-        }
-    }
-
-    return OLAP_SUCCESS;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::first() {
-    _row_index = 0;
-    _row_block->get_row(_row_index, &_row_cursor);
-    return &_row_cursor;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::last() {
-    _row_index = _num_rows - 1;
-    _row_block->get_row(_row_index, &_row_cursor);
-    return &_row_cursor;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::next(bool* end_of_row_block) {
-    if (++_row_index >= _num_rows) {
-        (end_of_row_block == NULL || (*end_of_row_block = true));
-        return NULL;
-    }
-
-    (end_of_row_block == NULL || (*end_of_row_block = false));
-
-    _row_block->get_row(_row_index, &_row_cursor);
-
-    return &_row_cursor;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::find_row(const RowCursor& key,
-                                                    bool find_last_key,
-                                                    bool* end_of_row_block) {
-    if (_row_block->find_row(key, find_last_key, &_row_index) != OLAP_SUCCESS) {
-        VLOG(3) << "fail to find row from row block. [key='" << key.to_string() << "']";
-        return NULL;
-    }
-
-    if (_row_index >= _num_rows) {
-        (end_of_row_block == NULL || (*end_of_row_block = true));
-        return NULL;
-    }
-
-    _row_block->get_row(_row_index, &_row_cursor);
-
-    (end_of_row_block == NULL || (*end_of_row_block = false));
-    
-    return &_row_cursor;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::get_row(uint32_t row_index) {
-    _row_index = row_index;
-    if (_row_index >= _num_rows) {
-        return NULL;
-    }
-    _row_block->get_row(_row_index, &_row_cursor);
-    return &_row_cursor;
-}
-
-const RowCursor* OLAPData::RowBlockBroker::current() {
-    if (_row_index >= _num_rows) {
-        return NULL;
-    }
-
-    if (_row_block == NULL) {
-        OLAP_LOG_WARNING("didn't have row block.");
-        return NULL;
-    }
-
-    return &_row_cursor;
-}
-
-OLAPStatus OLAPData::RowBlockBroker::change_to(const RowBlockPosition& row_block_pos) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    // 先将持有的row_block释放
-    this->release();
-
-    _row_block_pos = row_block_pos;
-
-    if ((res = _get_row_block(row_block_pos)) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to get row block.[res=%d]", res);
-        return res;
-    }
-
-    // TODO(hujie01) 考虑删除条件的版本
-    // _row_block->restore();
-    
-    if (_is_set_end_row && (_row_block_pos == _end_block_position)) {
-        _num_rows = _end_row_index;
-        _is_end_block = true;
-    } else {
-        _num_rows = _row_block->row_num();
-        _is_end_block = false;
-    }
-
-    _row_index = 0;
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::RowBlockBroker::release() {
-    if (_runtime_state != NULL && _row_block != NULL) {
-        MemTracker::update_limits(-1 * _row_block->buf_len(), _runtime_state->mem_trackers());
-    }
-    SAFE_DELETE(_row_block);
-    _row_block = NULL;
-    _num_rows = 0;
-    _row_index = 0;
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPData::RowBlockBroker::_get_row_block(const RowBlockPosition& row_block_pos) {
-    uint32_t num_rows = 0;
-    uint32_t packed_len = 0;
-    uint32_t checksum = 0;
-    OLAPStatus res = OLAP_SUCCESS;
-    RowBlockInfo row_block_info;
-    string file_name;
-    RowBlockHeaderV2* row_block_header = NULL;
-
-    if (row_block_pos.block_size > _data_read_buf_size) {
-        uint64_t max_packed_row_block_size = config::max_packed_row_block_size;
-
-        // if max_unpacked_row_block_size not set or block_size still larger than
-        // user specified max_unpacked_row_block_size
-        if (max_packed_row_block_size == 0
-                || row_block_pos.block_size > max_packed_row_block_size) {
-            OLAP_LOG_WARNING("row block size lager than buf. [block_size=%d]",
-                             row_block_pos.block_size);
-            return OLAP_ERR_ROWBLOCK_READ_INFO_ERROR;
-        }
-
-        SAFE_DELETE_ARRAY(_read_buffer);
-
-        _read_buffer = new(nothrow) char[row_block_pos.block_size + sizeof(RowBlockHeaderV2)];
-        if (_read_buffer == NULL) {
-            if (_runtime_state != NULL) {
-                MemTracker::update_limits(-1 * _data_read_buf_size, _runtime_state->mem_trackers());
-            }
-            OLAP_LOG_WARNING("malloc for read buffer failed. size=%u", row_block_pos.block_size);
-            return OLAP_ERR_MALLOC_ERROR;
-        }
-
-        if (_runtime_state != NULL) {
-            MemTracker::update_limits(
-                    row_block_pos.block_size - _data_read_buf_size, _runtime_state->mem_trackers());
-            _data_read_buf_size = row_block_pos.block_size;
-            if (MemTracker::limit_exceeded(*_runtime_state->mem_trackers())) {
-                return OLAP_ERR_FETCH_MEMORY_EXCEEDED;
-            }
-        }
-    }
-
-    file_name = _olap_index->construct_data_file_path(_olap_index->rowset_id(), row_block_pos.segment);
-
-    if ((res = _file_handler.open_with_cache(file_name, O_RDONLY)) != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to open file. [file_name=" << file_name << "]";
-        goto GET_ROW_BLOCK_ERROR;
-    }
-    
-    res = _file_handler.pread(_read_buffer, row_block_pos.block_size, row_block_pos.data_offset);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to read block from file. [offset=%d, size=%d, buf_size=%d]",
-                         row_block_pos.data_offset,
-                         row_block_pos.block_size,
-                         row_block_pos.block_size);
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    // 处理row_block header, 这地方比较丑,先写,后边和filehelper一起优化
-    row_block_header = reinterpret_cast<RowBlockHeaderV2*>(_read_buffer);
-    if (row_block_header->magic_num != 0) {
-        RowBlockHeader* old_row_block_header = reinterpret_cast<RowBlockHeader*>(_read_buffer);
-        num_rows = old_row_block_header->num_rows;
-        packed_len = old_row_block_header->packed_len;
-        checksum = old_row_block_header->checksum;
-        _unpacked_len = _olap_table->get_row_size() * num_rows;
-        _row_block_header_size = sizeof(RowBlockHeader);
-    } else {
-        num_rows = row_block_header->num_rows;
-        packed_len = row_block_header->packed_len;
-        checksum = row_block_header->checksum;
-        _unpacked_len = row_block_header->unpacked_len;
-        _row_block_header_size = sizeof(RowBlockHeaderV2);
-    }
-
-    if (_row_block_pos.block_size != packed_len + _row_block_header_size) {
-        OLAP_LOG_WARNING("row block len on disk not match in index. "
-                         "[block_size_in_index=%u block_size_in_row_block_header=%u]",
-                         _row_block_pos.block_size,
-                         packed_len + _row_block_header_size);
-        res = OLAP_ERR_ROWBLOCK_READ_INFO_ERROR;
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    if (num_rows < 1) {
-        OLAP_LOG_WARNING("row block is empty. [num_rows=%d]", num_rows);
-        res = OLAP_ERR_ROWBLOCK_READ_INFO_ERROR;
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    _packed_row_block_size = packed_len;
-
-    // 创建新的row_block
-    row_block_info.checksum = checksum;
-    row_block_info.row_num = num_rows;
-    row_block_info.unpacked_len = _unpacked_len;
-    row_block_info.data_file_type = OLAP_DATA_FILE;
-    row_block_info.null_supported = _olap_index->get_null_supported(0);
-
-    if ((_row_block = new(nothrow) RowBlock(_olap_table->tablet_schema())) == NULL) {
-        OLAP_LOG_FATAL("fail to malloc RowBlock. [size=%ld]", sizeof(RowBlock));
-        res = OLAP_ERR_MALLOC_ERROR;
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    if ((res = _row_block->init(row_block_info)) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to init row block. [res=%d]", res);
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    if (_runtime_state != NULL) {
-        MemTracker::update_limits(_row_block->buf_len(), _runtime_state->mem_trackers());
-        if (MemTracker::limit_exceeded(*_runtime_state->mem_trackers())) {
-            res = OLAP_ERR_FETCH_MEMORY_EXCEEDED;
-            OLAP_GOTO(GET_ROW_BLOCK_ERROR);
-        }
-    }
-
-    res = _row_block->decompress(_read_buffer + _row_block_header_size,
-                                 packed_len,
-                                 OLAP_COMP_STORAGE);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to decompress the row block. [res=%d, packed_len=%d]",
-                         res,
-                         packed_len);
-        goto GET_ROW_BLOCK_ERROR;
-    }
-
-    return OLAP_SUCCESS;
-
-GET_ROW_BLOCK_ERROR:
-    if (_runtime_state != NULL && _row_block != NULL) {
-        MemTracker::update_limits(-1 * _row_block->buf_len(), _runtime_state->mem_trackers());
-    }
-    SAFE_DELETE(_row_block);
-    return res;
-}
-
-}  // namespace doris
diff --git a/be/src/olap/olap_data.h b/be/src/olap/olap_data.h
deleted file mode 100644
index 7498d53c..00000000
--- a/be/src/olap/olap_data.h
+++ /dev/null
@@ -1,371 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_OLAP_OLAP_DATA_H
-#define DORIS_BE_SRC_OLAP_OLAP_DATA_H
-
-#include <map>
-#include <string>
-#include <vector>
-
-#include "olap/i_data.h"
-#include "olap/olap_common.h"
-#include "olap/olap_define.h"
-#include "olap/olap_index.h"
-#include "olap/olap_table.h"
-#include "olap/row_block.h"
-
-namespace doris {
-
-// Row Storage Table is deprecated.
-// This file will be removed in succedent release.
-
-class OLAPTable;
-class RowBlock;
-class RowCursor;
-
-// Class for managing data files.
-//
-// Typically, an 'OLAPData' represent a single version data file. Internally, an OLAPData is split
-// into segments of typically 4GB files. Within each segment, the data is stored as RowBlocks.
-//
-// The interface for 'OLAPData' is cursor based. There are methods for positioning the cursor 
-// inside the data file, and then get_next_row() will fetch the next row.
-//
-// The original OLAPData File is a kind of row-column mixed file type. And it is named as
-// RAW_COLUMN_MIXED.
-//
-// To reduce the number of code to refactor, the class name "OLAPData" is reserved for
-// the original RAW_COLUMN_MIXED.
-class OLAPData: public IData {
-public:
-    explicit OLAPData(Rowset* index);
-    virtual ~OLAPData();
-
-    // 初始化, 和unpickle统一到同一流程上
-    virtual OLAPStatus init();
-
-    OLAPStatus get_first_row_block(RowBlock** row_block,
-                               const char** packed_row_block,
-                               uint32_t* packed_row_block_size);
-
-    // Advances to the next row block and returns it through row_block.
-    // Also returns the corresponding packed_row_block and
-    // packed_row_block_size, which is the size of packed_row_block.
-    // Returns OLAP_SUCCESS on success or end of file has been reached;
-    // other if in case of error.  The class retains ownership of row_block and
-    // packed_row_block.
-    OLAPStatus get_next_row_block(RowBlock** row_block,
-                              const char** packed_row_block,
-                              uint32_t* packed_row_block_size);
-
-    virtual OLAPStatus get_next_row_block(RowBlock** row_block);
-
-    virtual OLAPStatus get_first_row_block(RowBlock** row_block);
-
-    RowBlock* seek_and_get_row_block(const RowBlockPosition& position);
-
-    // Points the internal cursor to either the first row or the last row.
-    // Returns NULL in case of an error.
-    const RowCursor* get_first_row();
-
-    // Advances the internal cursor to the next row and returns that row.
-    // Sets _eof if there is no more row left.
-    const RowCursor* get_next_row();
-
-    // Points internal cursor to the first row equal to or larger than 'key'.
-    // to key. Returns a pointer to the row or NULL if 1) there is an
-    // error, or 2) the key exceeds any row in the table.
-    const RowCursor* find_row(const RowCursor& key, bool find_last_key, bool is_end_key);
-
-    // find_last_end_key false:<; true:<=
-    OLAPStatus set_end_key(const RowCursor* end_key, bool find_last_end_key);
-
-    OLAPStatus prepare_block_read(
-        const RowCursor* start_key, bool find_start_key,
-        const RowCursor* end_key, bool find_end_key,
-        RowBlock** block) override;
-
-    OLAPStatus get_next_block(RowBlock** block) override;
-
-    // The following four functions are used for creating new date
-    // files. add_segment() and finalize_segment() start and end a new
-    // segment respectively, while add_row_block() and add_packed_rowblock()
-    // add a new data block to the current segment.(only writer)
-    OLAPStatus add_segment();
-
-    // TODO(fdy): 未实现方法,等待有使用需求时再实现
-    OLAPStatus add_packed_rowblock(
-            const char* packed_row_block, const uint32_t packed_row_block_size);
-
-    // @brief add row block into OLAPData, row block will be compressed before writing.
-    // @param [in] row_block
-    // @param [out] start_data_offset  start data offset of the row block in OLAPData Segment. It
-    //                                 is used as data offset of corresponding index item.
-    // @param [out] end_data_offset    end data offset of the row block in OLAPData Segment. it
-    //                                 equals to the current segment file length.
-    // @return  OLAPStatus  OLAP_SUCCESS if succeed, or else OLAP_ERR_XXX
-    // @note
-    OLAPStatus add_row_block(RowBlock* row_block,
-                             uint32_t* start_data_offset,
-                             uint32_t* end_data_offset);
-
-    // 结束segment,回写头部
-    OLAPStatus finalize_segment(uint32_t* data_offset);
-
-    void sync();
-
-    // 腌制和反腌制方法
-    // 腌制要减index的ref, munmap和close fd, 用于存储在session中
-    virtual OLAPStatus pickle();
-    
-    // 反腌制要加index的ref, open和mmap fd, 从session中拿出之后为继续使用做准备
-    virtual OLAPStatus unpickle();
-
-    Version version() const {
-        return olap_index()->version();
-    }
-    
-    VersionHash version_hash() const {
-        return olap_index()->version_hash();
-    }
-    
-    uint32_t num_segments() const {
-        return olap_index()->num_segments();
-    }
-    
-private:
-    // RowBlock代理,内部类,可以代理指定position的RowBlock,为RowBlock提供类似Iterator服务。
-    class RowBlockBroker {
-    public:
-        RowBlockBroker(OLAPTable* olap_table, Rowset* olap_index, RuntimeState* runtime_state);
-        ~RowBlockBroker();
-
-        OLAPStatus init();
-
-        // 根据block position,在文件中定位row_block
-        OLAPStatus change_to(const RowBlockPosition& row_block_pos);
-        // 释放当前持有的row_block
-        OLAPStatus release();
-
-        const RowCursor* first();
-        const RowCursor* last();
-        const RowCursor* current();
-
-        const RowCursor* get_row(uint32_t row_index);
-        const RowCursor* next(bool* end_of_row_block);
-        const RowCursor* find_row(const RowCursor& key, bool find_last_key, bool* end_of_row_block);
-
-        void set_end_row(const RowBlockPosition& end_block_position, uint32_t end_row_index) {
-            _end_block_position = end_block_position;
-            _end_row_index = end_row_index;
-        }
-
-        void set_end_row_flag(bool flg) {
-            _is_set_end_row = flg;
-        }
-
-        const RowBlockPosition& end_block_position() {
-            return _end_block_position;
-        }
-
-        const uint32_t end_row_index() {
-            return _end_row_index;
-        }
-
-        bool get_set_end_row_flag() {
-            return _is_set_end_row;
-        }
-
-        const RowBlockPosition& position() {
-            return _row_block_pos;
-        }
-        
-        const uint32_t row_index() {
-            return _row_index;
-        }
-
-        RowBlock* row_block() {
-            return _row_block;
-        }
-
-        RowBlock* get_row_block_to_read() {
-            _row_block->set_pos(_row_index);
-            _row_block->set_limit(_num_rows);
-            return _row_block;
-        }
-
-        const char* packed_row_block() {
-            return _read_buffer + _row_block_header_size;
-        }
-
-        const uint32_t packed_row_block_size() {
-            return _packed_row_block_size;
-        }
-
-        const bool is_end_block() {
-            return _is_end_block;
-        }
-
-        uint32_t num_rows() const { return _num_rows; }
-
-        Tuple* get_next_tuple();
-        
-    private:
-        OLAPStatus _get_row_block(const RowBlockPosition& row_block_pos);
-
-        FileHandler _file_handler; // 内部持有资源的管理者
-        RowBlockPosition _row_block_pos;
-        char* _read_buffer; // 包含RowBlockHeader和数据
-        uint32_t _packed_row_block_size; // 数据的长度
-        uint32_t _row_block_header_size; // Header的长度
-        uint32_t _unpacked_len; // 解压后的大小 由于header只有指针,没法写进去
-        
-        RowBlock* _row_block;
-        uint32_t _num_rows;
-        uint32_t _row_index; // 记录_row_cursor在当前row_block中的位置
-        RowCursor _row_cursor;
-
-        RowBlockPosition _end_block_position;
-        uint32_t _end_row_index;
-        bool _is_set_end_row;
-
-        OLAPTable* _olap_table;
-        Rowset* _olap_index;
-
-        uint64_t _data_read_buf_size;
-        bool _is_end_block;
-        RuntimeState* _runtime_state;
-    };  // end of class RowBlockBroker
-
-    // 存在于磁盘上每个row_block之前的头信息
-    struct __attribute__((packed)) RowBlockHeader {
-        uint32_t checksum;      // 压缩之前数据的校验和
-        uint32_t packed_len;    // 压缩后的长度,目前row block内部用不到
-        uint32_t num_rows;      // block内有效数据行数
-    };
-
-    struct __attribute__((packed)) RowBlockHeaderV2 {
-        uint32_t magic_num;
-        uint32_t version;
-        uint32_t checksum;      // 压缩之前数据的校验和
-        uint32_t packed_len;    // 压缩后的长度,目前row block内部用不到
-        uint32_t num_rows;      // block内有效数据行数
-        uint32_t unpacked_len;  // 解压缩之后的长度
-    };
-
-    static const uint32_t OLAP_DEFAULT_DATA_READ_BUF_SIZE =
-        OLAP_DEFAULT_MAX_PACKED_ROW_BLOCK_SIZE + sizeof(RowBlockHeaderV2);
-
-    // 一个Session期间需要保存的信息,避免同一个Session的多个request发起多次定位seek。
-    struct SessionStatus {
-        RowBlockPosition position;
-        uint32_t row_index;
-        RowBlockPosition end_block_position;
-        uint32_t end_row_index;
-        bool is_set_end_row;
-    };
-
-    // 供OLAPData写流程所使用的参数
-    struct WriteDescriptor {
-        WriteDescriptor() : checksum(0), segment(0), packed_buffer(NULL) {};
-
-        FileHandler file_handle;
-        FileHeader<OLAPDataHeaderMessage> file_header;
-        uint32_t checksum;
-        uint32_t segment;
-        char* packed_buffer;    // buffer size = OLAP_MAX_PACKED_ROW_BLOCK_SIZE
-    };
-
-    void _check_io_error(OLAPStatus res);
-
-    OLAPTable* _olap_table;
-
-    // 当前olapdata是否处于腌制状态
-    bool _is_pickled;
-    SessionStatus* _session_status;
-    // RowBlock管理者,负责管理RowBlock及所需资源,如文件描述符等
-    RowBlockBroker* _row_block_broker;
-    // for writing
-    WriteDescriptor* _write_descriptor;
-
-    // Add the semicolon just for eagle.py!
-    DISALLOW_COPY_AND_ASSIGN(OLAPData);
-};
-
-class OLAPDataComparator {
-public:
-    OLAPDataComparator(RowBlockPosition position,
-                       OLAPData* olap_data,
-                       const Rowset* index,
-                       RowCursor* helper_cursor) :
-            _start_block_position(position),
-            _olap_data(olap_data),
-            _index(index),
-            _helper_cursor(helper_cursor) {}
-
-    // This class is used as functor. So, destructor do nothing here
-    ~OLAPDataComparator() {}
-
-    // less comparator function
-    bool operator()(const iterator_offset_t& index, const RowCursor& key) const {
-        return _compare(index, key, COMPARATOR_LESS);
-    }
-
-    // larger comparator function
-    bool operator()(const RowCursor& key, const iterator_offset_t& index) const {
-        return _compare(index, key, COMPARATOR_LARGER);
-    }
-
-private:
-    bool _compare(const iterator_offset_t& index,
-                  const RowCursor& key,
-                  ComparatorEnum comparator_enum) const {
-        OLAPStatus res = OLAP_SUCCESS;
-        RowBlockPosition position = _start_block_position;
-        if ((res = _index->advance_row_block(index, &position)) != OLAP_SUCCESS) {
-            OLAP_LOG_FATAL("fail to advance row block. [res=%d]", res);
-            throw ComparatorException();
-        }
-
-        RowBlock* block = _olap_data->seek_and_get_row_block(position);
-        if (block == NULL) {
-            OLAP_LOG_FATAL("fail to seek and get row block.");
-            throw ComparatorException();
-        }
-
-        // 取block里的最后一条数据与key进行比较,返回小于的结果
-        // TODO(hujie01): 比较block暂时不使用过滤条件
-        uint32_t row_num = block->row_block_info().row_num;
-        block->get_row(row_num - 1, _helper_cursor);
-
-        if (comparator_enum == COMPARATOR_LESS) {
-            return _helper_cursor->cmp(key) < 0;
-        } else {
-            return _helper_cursor->cmp(key) > 0;
-        }
-    }
-
-    const RowBlockPosition _start_block_position;
-    OLAPData* _olap_data;
-    const Rowset* _index;
-    RowCursor* _helper_cursor;
-};
-
-}  // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_OLAP_DATA_H
diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp
index 0b40a93b..bcfbca6b 100644
--- a/be/src/olap/olap_engine.cpp
+++ b/be/src/olap/olap_engine.cpp
@@ -44,7 +44,7 @@
 #include "olap/schema_change.h"
 #include "olap/store.h"
 #include "olap/utils.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 #include "util/time.h"
 #include "util/doris_metrics.h"
 #include "util/pretty_printer.h"
@@ -251,7 +251,7 @@ OLAPStatus OLAPEngine::load_one_tablet(
         if (OLAPEngine::get_instance()->drop_table(tablet_id, schema_hash) != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to drop table when create table failed. "
                              "[tablet=%ld schema_hash=%d]",
-                             tablet_id, schema_hash); 
+                             tablet_id, schema_hash);
         }
 
         return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
@@ -266,6 +266,117 @@ OLAPStatus OLAPEngine::load_one_tablet(
     return OLAP_SUCCESS;
 }
 
+void OLAPEngine::check_none_row_oriented_table(const std::vector<OlapStore*>& stores) {
+    std::vector<std::thread> threads;
+    for (auto store : stores) {
+        auto res = _check_none_row_oriented_table_in_store(store);
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "io error when init load tables. res=" << res
+                << ", store=" << store->path();
+        }
+    }
+}
+
+OLAPStatus OLAPEngine::_check_none_row_oriented_table_in_store(OlapStore* store) {
+    std::string store_path = store->path();
+    LOG(INFO) <<"start to load tablets from store_path:" << store_path;
+
+    bool is_header_converted = false;
+    OLAPStatus res = OlapHeaderManager::get_header_converted(store, is_header_converted);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "get convert flag from meta failed";
+        return res;
+    }
+    if (is_header_converted) {
+        OLAPStatus s = store->check_none_row_oriented_table_in_store(this);
+        if (s != OLAP_SUCCESS) {
+            LOG(WARNING) << "there is failure when loading table headers, path:" << store_path;
+            return s;
+        } else {
+            return OLAP_SUCCESS;
+        }
+    }
+
+    // compatible for old header load method
+    // walk all directory to load header file
+    LOG(INFO) << "check has none row-oriented table from header files";
+
+    // get all shards
+    set<string> shards;
+    if (dir_walk(store_path + DATA_PREFIX, &shards, NULL) != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to walk dir. [root=" << store_path << "]";
+        return OLAP_ERR_INIT_FAILED;
+    }
+
+    for (const auto& shard : shards) {
+        // get all tablets
+        set<string> tablets;
+        string one_shard_path = store_path + DATA_PREFIX +  '/' + shard;
+        if (dir_walk(one_shard_path, &tablets, NULL) != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to walk dir. [root=" << one_shard_path << "]";
+            continue;
+        }
+
+        for (const auto& tablet : tablets) {
+            // 遍历table目录寻找此table的所有indexedRollupTable,注意不是Rowset,而是OLAPTable
+            set<string> schema_hashes;
+            string one_tablet_path = one_shard_path + '/' + tablet;
+            if (dir_walk(one_tablet_path, &schema_hashes, NULL) != OLAP_SUCCESS) {
+                LOG(WARNING) << "fail to walk dir. [root=" << one_tablet_path << "]";
+                continue;
+            }
+
+            for (const auto& schema_hash : schema_hashes) {
+                TTabletId tablet_id = strtoul(tablet.c_str(), NULL, 10);
+                TSchemaHash tablet_schema_hash = strtoul(schema_hash.c_str(), NULL, 10);
+
+                // 遍历schema_hash目录寻找此index的所有schema
+                // 加载失败依然加载下一个Table
+                if (check_none_row_oriented_table_in_path(
+                        store,
+                        tablet_id,
+                        tablet_schema_hash,
+                        one_tablet_path + '/' + schema_hash) != OLAP_SUCCESS) {
+                    OLAP_LOG_WARNING("fail to load one table, but continue. [path='%s']",
+                                     (one_tablet_path + '/' + schema_hash).c_str());
+                }
+            }
+        }
+    }
+    return res;
+}
+
+OLAPStatus OLAPEngine::check_none_row_oriented_table_in_path(
+        OlapStore* store, TTabletId tablet_id,
+        SchemaHash schema_hash, const string& schema_hash_path) {
+    stringstream header_name_stream;
+    header_name_stream << schema_hash_path << "/" << tablet_id << ".hdr";
+    string header_path = header_name_stream.str();
+    path boost_schema_hash_path(schema_hash_path);
+
+    if (access(header_path.c_str(), F_OK) != 0) {
+        LOG(WARNING) << "fail to find header file. [header_path=" << header_path << "]";
+        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
+        return OLAP_ERR_FILE_NOT_EXIST;
+    }
+
+    auto olap_table = OLAPTable::create_from_header_file_for_check(
+            tablet_id, schema_hash, header_path);
+    if (olap_table == NULL) {
+        LOG(WARNING) << "fail to load table. [header_path=" << header_path << "]";
+        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
+        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
+    }
+
+    LOG(INFO) << "data_file_type:" << olap_table->data_file_type();
+    if (olap_table->data_file_type() == OLAP_DATA_FILE) {
+        LOG(FATAL) << "Not support row-oriented table any more. Please convert it to column-oriented table."
+                   << "tablet=" << olap_table->full_name();
+    }
+
+    return OLAP_SUCCESS;
+}
+
 void OLAPEngine::load_stores(const std::vector<OlapStore*>& stores) {
     std::vector<std::thread> threads;
     for (auto store : stores) {
@@ -327,8 +438,8 @@ OLAPStatus OLAPEngine::open() {
     _max_base_compaction_task_per_disk = (base_compaction_num_threads + file_system_num - 1) / file_system_num;
 
     auto stores = get_stores();
+    check_none_row_oriented_table(stores);
     load_stores(stores);
-
     // 取消未完成的SchemaChange任务
     _cancel_unfinished_schema_change();
 
@@ -1367,7 +1478,7 @@ OLAPStatus OLAPEngine::create_init_version(TTabletId tablet_id, SchemaHash schem
                    version.first, version.second);
 
     OLAPTablePtr table;
-    IWriter* writer = NULL;
+    ColumnDataWriter* writer = NULL;
     Rowset* new_rowset = NULL;
     OLAPStatus res = OLAP_SUCCESS;
     std::vector<Rowset*> index_vec;
@@ -1396,7 +1507,7 @@ OLAPStatus OLAPEngine::create_init_version(TTabletId tablet_id, SchemaHash schem
         }
 
         // Create writer, which write nothing to table, to generate empty data file
-        writer = IWriter::create(table, new_rowset, false);
+        writer = ColumnDataWriter::create(table, new_rowset, false);
         if (writer == NULL) {
             LOG(WARNING) << "fail to create writer. [table=" << table->full_name() << "]";
             res = OLAP_ERR_MALLOC_ERROR;
diff --git a/be/src/olap/olap_engine.h b/be/src/olap/olap_engine.h
index 15a8b904..f54edc22 100644
--- a/be/src/olap/olap_engine.h
+++ b/be/src/olap/olap_engine.h
@@ -183,6 +183,12 @@ class OLAPEngine {
     // 获取cache的使用情况信息
     void get_cache_status(rapidjson::Document* document) const;
 
+    void check_none_row_oriented_table(const std::vector<OlapStore*>& stores);
+    OLAPStatus check_none_row_oriented_table_in_path(
+                    OlapStore* store, TTabletId tablet_id,
+                    SchemaHash schema_hash, const std::string& schema_hash_path);
+    OLAPStatus _check_none_row_oriented_table_in_store(OlapStore* store);
+
     // Note: 这里只能reload原先已经存在的root path,即re-load启动时就登记的root path
     // 是允许的,但re-load全新的path是不允许的,因为此处没有彻底更新ce调度器信息
     void load_stores(const std::vector<OlapStore*>& stores);
diff --git a/be/src/olap/olap_header.cpp b/be/src/olap/olap_header.cpp
index ffd2ed84..7d838917 100644
--- a/be/src/olap/olap_header.cpp
+++ b/be/src/olap/olap_header.cpp
@@ -151,6 +151,31 @@ OLAPStatus OLAPHeader::load_and_init() {
     return init();
 }
 
+OLAPStatus OLAPHeader::load_for_check() {
+    FileHeader<OLAPHeaderMessage> file_header;
+    FileHandler file_handler;
+
+    if (file_handler.open(_file_name.c_str(), O_RDONLY) != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to open index file. [file='%s']", _file_name.c_str());
+        return OLAP_ERR_IO_ERROR;
+    }
+
+    // In file_header.unserialize(), it validates file length, signature, checksum of protobuf.
+    if (file_header.unserialize(&file_handler) != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to unserialize header. [path='%s']", _file_name.c_str());
+        return OLAP_ERR_PARSE_PROTOBUF_ERROR;
+    }
+
+    try {
+        CopyFrom(file_header.message());
+    } catch (...) {
+        OLAP_LOG_WARNING("fail to copy protocol buffer object. [path='%s']", _file_name.c_str());
+        return OLAP_ERR_PARSE_PROTOBUF_ERROR;
+    }
+
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus OLAPHeader::save() {
     return save(_file_name);
 }
diff --git a/be/src/olap/olap_header.h b/be/src/olap/olap_header.h
index b9d6e448..8bc6d042 100644
--- a/be/src/olap/olap_header.h
+++ b/be/src/olap/olap_header.h
@@ -46,6 +46,7 @@ class OLAPHeader : public OLAPHeaderMessage {
     // In load_and_init(), we will validate olap header file, which mainly include
     // tablet schema, delta version and so on.
     OLAPStatus load_and_init();
+    OLAPStatus load_for_check();
 
     // Saves the header to disk, returning true on success.
     OLAPStatus save();
diff --git a/be/src/olap/olap_header_manager.cpp b/be/src/olap/olap_header_manager.cpp
index 9bd04dca..a6fccd13 100755
--- a/be/src/olap/olap_header_manager.cpp
+++ b/be/src/olap/olap_header_manager.cpp
@@ -126,19 +126,7 @@ OLAPStatus OlapHeaderManager::set_converted_flag(OlapStore* store) {
 }
 
 OLAPStatus OlapHeaderManager::traverse_headers(OlapMeta* meta,
-        std::function<bool(long, long, const std::string&)> const& func) {
-    auto traverse_header_func = [&func](const std::string& key, const std::string& value) -> bool {
-        std::vector<std::string> parts;
-        // key format: "hdr_" + tablet_id + "_" + schema_hash
-        split_string<char>(key, '_', &parts);
-        if (parts.size() != 3) {
-            LOG(WARNING) << "invalid header key:" << key << ", splitted size:" << parts.size();
-            return true;
-        }
-        TTabletId tablet_id = std::stol(parts[1].c_str(), NULL, 10);
-        TSchemaHash schema_hash = std::stol(parts[2].c_str(), NULL, 10);
-        return func(tablet_id, schema_hash, value);
-    };
+        std::function<bool(const std::string&, const std::string&)> const& traverse_header_func) {
     OLAPStatus status = meta->iterate(META_COLUMN_FAMILY_INDEX, HEADER_PREFIX, traverse_header_func);
     return status;
 }
diff --git a/be/src/olap/olap_header_manager.h b/be/src/olap/olap_header_manager.h
index c74be38c..ba5d3839 100644
--- a/be/src/olap/olap_header_manager.h
+++ b/be/src/olap/olap_header_manager.h
@@ -39,7 +39,7 @@ class OlapHeaderManager {
     static OLAPStatus remove(OlapStore* store, TTabletId tablet_id, TSchemaHash schema_hash);
 
     static OLAPStatus traverse_headers(OlapMeta* meta,
-            std::function<bool(long, long, const std::string&)> const& func);
+        std::function<bool(const std::string&, const std::string&)> const& traverse_header_func);
 
     static OLAPStatus get_header_converted(OlapStore* store, bool& flag);
 
diff --git a/be/src/olap/olap_index.cpp b/be/src/olap/olap_index.cpp
index 57bbe844..e41ac07f 100644
--- a/be/src/olap/olap_index.cpp
+++ b/be/src/olap/olap_index.cpp
@@ -22,7 +22,7 @@
 #include <cmath>
 #include <fstream>
 
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_table.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
@@ -168,7 +168,7 @@ OLAPStatus MemIndex::load_segment(const char* file, size_t *current_num_rows_per
     }
 
     /*
-     * convert storage layout to memory layout for olapindex
+     * convert storage layout to memory layout for olap/ndex
      * In this procedure, string type(Varchar/Char) should be
      * converted with caution. Hyperloglog type will not be
      * key, it can not to be handled.
diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp
index 54d7c301..ab2c7c0c 100644
--- a/be/src/olap/olap_snapshot.cpp
+++ b/be/src/olap/olap_snapshot.cpp
@@ -33,7 +33,7 @@
 #include "common/status.h"
 #include "olap/field.h"
 #include "olap/olap_common.h"
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_define.h"
 #include "olap/rowset.h"
 #include "olap/olap_table.h"
@@ -304,7 +304,7 @@ OLAPStatus OLAPEngine::_create_snapshot_files(
     ref_olap_table->obtain_header_rdlock();
     header_locked = true;
 
-    vector<IData*> olap_data_sources;
+    vector<ColumnData*> olap_data_sources;
     OLAPHeader* new_olap_header = nullptr;
     do {
         // get latest version
@@ -663,7 +663,7 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
         return OLAP_SUCCESS;
     }
 
-    vector<IData*> olap_data_sources;
+    vector<ColumnData*> olap_data_sources;
     tablet->obtain_push_lock();
 
     do {
diff --git a/be/src/olap/olap_table.cpp b/be/src/olap/olap_table.cpp
index 47e1f1f7..f7be7305 100644
--- a/be/src/olap/olap_table.cpp
+++ b/be/src/olap/olap_table.cpp
@@ -28,7 +28,7 @@
 #include <boost/filesystem.hpp>
 
 #include "olap/field.h"
-#include "olap/i_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/olap_engine.h"
@@ -41,7 +41,7 @@
 #include "olap/olap_header_manager.h"
 #include "olap/olap_engine.h"
 #include "olap/utils.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 
 using std::pair;
 using std::map;
@@ -91,6 +91,44 @@ OLAPTablePtr OLAPTable::create_from_header_file(
     return create_from_header(olap_header, store);
 }
 
+OLAPTablePtr OLAPTable::create_from_header_file_for_check(
+        TTabletId tablet_id, TSchemaHash schema_hash, const string& header_file) {
+    OLAPHeader* olap_header = NULL;
+
+    olap_header = new(nothrow) OLAPHeader(header_file);
+    if (olap_header == NULL) {
+        OLAP_LOG_WARNING("fail to malloc OLAPHeader.");
+        return NULL;
+    }
+
+    if (olap_header->load_for_check() != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to load header. [header_file=%s]", header_file.c_str());
+        delete olap_header;
+        return NULL;
+    }
+
+    OLAPTablePtr olap_table = std::make_shared<OLAPTable>(olap_header);
+    if (olap_table == NULL) {
+        OLAP_LOG_WARNING("fail to validate table. [header_file=%s]", header_file.c_str());
+        delete olap_header;
+        return NULL;
+    }
+    olap_table->_tablet_id = tablet_id;
+    olap_table->_schema_hash = schema_hash;
+    olap_table->_full_name = std::to_string(tablet_id) + "." + std::to_string(schema_hash);
+    return olap_table;
+}
+
+OLAPTable::OLAPTable(OLAPHeader* header)
+        : _header(header) {
+    if (header->has_tablet_id()) {
+        _tablet_id =  header->tablet_id();
+        _schema_hash = header->schema_hash();
+        _full_name = std::to_string(header->tablet_id()) + "." + std::to_string(header->schema_hash());
+    }
+    _table_for_check = true;
+}
+
 OLAPTablePtr OLAPTable::create_from_header(
         OLAPHeader* header,
         OlapStore* store) {
@@ -197,9 +235,14 @@ OLAPTable::OLAPTable(OLAPHeader* header, OlapStore* store) :
     _tablet_path = tablet_path_stream.str();
     _storage_root_path = store->path();
     _full_name = std::to_string(header->tablet_id()) + "." + std::to_string(header->schema_hash());
+    _table_for_check = false;
 }
 
 OLAPTable::~OLAPTable() {
+    if (_table_for_check) {
+        return;
+    }
+
     if (_header == NULL) {
         return;  // for convenience of mock test.
     }
@@ -405,7 +448,7 @@ OLAPStatus OLAPTable::select_versions_to_span( const Version& version,
     return res;
 }
 
-void OLAPTable::acquire_data_sources(const Version& version, vector<IData*>* sources) const {
+void OLAPTable::acquire_data_sources(const Version& version, vector<ColumnData*>* sources) const {
     vector<Version> span_versions;
 
     if (_header->select_versions_to_span(version, &span_versions) != OLAP_SUCCESS) {
@@ -419,7 +462,7 @@ void OLAPTable::acquire_data_sources(const Version& version, vector<IData*>* sou
 }
 
 void OLAPTable::acquire_data_sources_by_versions(const vector<Version>& version_list,
-                                                 vector<IData*>* sources) const {
+                                                 vector<ColumnData*>* sources) const {
     if (sources == NULL) {
         LOG(WARNING) << "output parameter for data sources is null. table=" << full_name();
         return;
@@ -440,7 +483,7 @@ void OLAPTable::acquire_data_sources_by_versions(const vector<Version>& version_
         }
 
         for (Rowset* rowset : it2->second) {
-            IData* olap_data = IData::create(rowset);
+            ColumnData* olap_data = ColumnData::create(rowset);
             if (olap_data == NULL) {
                 LOG(WARNING) << "fail to malloc Data. [version='" << it1->first
                     << "-" << it1->second << "' table='" << full_name() << "']";
@@ -460,7 +503,7 @@ void OLAPTable::acquire_data_sources_by_versions(const vector<Version>& version_
     }
 }
 
-OLAPStatus OLAPTable::release_data_sources(vector<IData*>* data_sources) const {
+OLAPStatus OLAPTable::release_data_sources(vector<ColumnData*>* data_sources) const {
     if (data_sources == NULL) {
         LOG(WARNING) << "parameter data_sources is null. [table='" << full_name() << "']";
         return OLAP_ERR_INPUT_PARAMETER_ERROR;
@@ -2193,7 +2236,7 @@ OLAPStatus OLAPTable::recover_tablet_until_specfic_version(
     for (Version& missing_version : missing_versions) {
         Rowset* rowset = new Rowset(this, missing_version, version_hash, false, 0, 0);
         rowset->set_empty(true);
-        IWriter* writer = IWriter::create(std::shared_ptr<OLAPTable>(this), rowset, true);
+        ColumnDataWriter* writer = ColumnDataWriter::create(std::shared_ptr<OLAPTable>(this), rowset, true);
         if (res != OLAP_SUCCESS) { break; }
 
         res = writer->finalize();
diff --git a/be/src/olap/olap_table.h b/be/src/olap/olap_table.h
index 9b5022a5..d823d086 100644
--- a/be/src/olap/olap_table.h
+++ b/be/src/olap/olap_table.h
@@ -36,7 +36,7 @@
 
 namespace doris {
 class FieldInfo;
-class IData;
+class ColumnData;
 class OLAPHeader;
 class Rowset;
 class OLAPTable;
@@ -99,12 +99,17 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
             TSchemaHash schema_hash,
             const std::string& header_file,
             OlapStore* store = nullptr);
+    static OLAPTablePtr create_from_header_file_for_check(
+            TTabletId tablet_id,
+            TSchemaHash schema_hash,
+            const std::string& header_file);
 
     static OLAPTablePtr create_from_header(
             OLAPHeader* header,
             OlapStore* store = nullptr);
 
     explicit OLAPTable(OLAPHeader* header, OlapStore* store);
+    explicit OLAPTable(OLAPHeader* header);
 
     virtual ~OLAPTable();
 
@@ -137,7 +142,7 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
     //      OLAPData:0-100      +
     //      OLAPData:101-110    +
     //      OLAPData:110-110    -
-    void acquire_data_sources(const Version& version, std::vector<IData*>* sources) const;
+    void acquire_data_sources(const Version& version, std::vector<ColumnData*>* sources) const;
 
     // Acquire data sources whose versions are specified by version_list.
     // If you want specified OLAPDatas instead of calling
@@ -147,10 +152,10 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
     // @param [in] version_list
     // @param [out] sources
     void acquire_data_sources_by_versions(const std::vector<Version>& version_list,
-                                          std::vector<IData*>* sources) const;
+                                          std::vector<ColumnData*>* sources) const;
 
     // Releases the acquired data sources. Returns true on success.
-    OLAPStatus release_data_sources(std::vector<IData*>* data_sources) const;
+    OLAPStatus release_data_sources(std::vector<ColumnData*>* data_sources) const;
 
     // Registers a newly created data source, making it available for
     // querying.  Adds a reference to the data source in the header file.
@@ -741,6 +746,8 @@ class OLAPTable : public std::enable_shared_from_this<OLAPTable> {
     Mutex _load_lock;
     std::string _tablet_path;
 
+    bool _table_for_check;
+
     DISALLOW_COPY_AND_ASSIGN(OLAPTable);
 };
 
diff --git a/be/src/olap/column_file/out_stream.cpp b/be/src/olap/out_stream.cpp
similarity index 98%
rename from be/src/olap/column_file/out_stream.cpp
rename to be/src/olap/out_stream.cpp
index 260c25ff..b581d249 100644
--- a/be/src/olap/column_file/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/out_stream.h"
+#include "olap/out_stream.h"
 
-#include "olap/column_file/byte_buffer.h"
+#include "olap/byte_buffer.h"
 #include "olap/file_helper.h"
 #include "olap/utils.h"
 #include "util/mem_util.hpp"
 
 namespace doris {
-namespace column_file {
 
 OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size) : 
         _compress_kind(compress_kind),
@@ -415,5 +414,4 @@ uint32_t OutStream::crc32(uint32_t checksum) const {
     return result;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/out_stream.h b/be/src/olap/out_stream.h
similarity index 96%
rename from be/src/olap/column_file/out_stream.h
rename to be/src/olap/out_stream.h
index 2b2e4ebd..07ff7255 100644
--- a/be/src/olap/column_file/out_stream.h
+++ b/be/src/olap/out_stream.h
@@ -18,15 +18,14 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_OUT_STREAM_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_OUT_STREAM_H
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/compress.h"
-#include "olap/column_file/stream_index_writer.h"
-#include "olap/column_file/stream_name.h"
+#include "olap/byte_buffer.h"
+#include "olap/compress.h"
+#include "olap/stream_index_writer.h"
+#include "olap/stream_name.h"
 #include "olap/olap_define.h"
 
 namespace doris {
 class FileHandler;
-namespace column_file {
 
 // 与OrcFile不同,我们底层没有HDFS无法保证存储数据的可靠性,所以必须写入
 // 校验值,在读取数据的时候检验这一校验值
@@ -183,6 +182,5 @@ class OutStreamBufferWrapper : public std::streambuf {
 };
 */
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_OUT_STREAM_H
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index d88f217e..89cb0322 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -534,7 +534,7 @@ OLAPStatus PushHandler::_convert(
     RowCursor row;
     BinaryFile raw_file;
     IBinaryReader* reader = NULL;
-    IWriter* writer = NULL;
+    ColumnDataWriter* writer = NULL;
     Rowset* delta_rowset = NULL;
     uint32_t  num_rows = 0;
 
@@ -620,7 +620,7 @@ OLAPStatus PushHandler::_convert(
                        curr_olap_table->full_name().c_str(),
                        curr_olap_table->num_rows_per_row_block());
 
-        if (NULL == (writer = IWriter::create(curr_olap_table, delta_rowset, true))) {
+        if (NULL == (writer = ColumnDataWriter::create(curr_olap_table, delta_rowset, true))) {
             OLAP_LOG_WARNING("fail to create writer. [table='%s']",
                              curr_olap_table->full_name().c_str());
             res = OLAP_ERR_MALLOC_ERROR;
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 4c4cf484..920a7d4f 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -29,11 +29,11 @@
 #include "olap/olap_common.h"
 #include "olap/rowset.h"
 #include "olap/row_cursor.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 
 namespace doris {
 
-typedef std::vector<IData*> DataSources;
+typedef std::vector<ColumnData*> DataSources;
 typedef std::vector<Rowset*> Indices;
 
 class BinaryFile;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index ed66ec22..23b4f906 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -17,7 +17,7 @@
 
 #include "olap/reader.h"
 
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_table.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
@@ -45,7 +45,7 @@ class CollectIterator {
     // set reverse to true if need read in reverse order.
     OLAPStatus init(Reader* reader);
 
-    OLAPStatus add_child(IData* data, RowBlock* block);
+    OLAPStatus add_child(ColumnData* data, RowBlock* block);
 
     // Get top row of the heap, NULL if reach end.
     const RowCursor* current_row(bool* delete_flag) const {
@@ -65,7 +65,7 @@ class CollectIterator {
 private:
     class ChildCtx {
     public:
-        ChildCtx(IData* data, RowBlock* block, Reader* reader)
+        ChildCtx(ColumnData* data, RowBlock* block, Reader* reader)
                 : _data(data),
                 _is_delete(data->delete_flag()),
                 _reader(reader),
@@ -134,7 +134,7 @@ class CollectIterator {
             return OLAP_ERR_DATA_EOF;
         }
 
-        IData* _data = nullptr;
+        ColumnData* _data = nullptr;
         const RowCursor* _current_row = nullptr;
         bool _is_delete = false;
         Reader* _reader;
@@ -187,7 +187,7 @@ OLAPStatus CollectIterator::init(Reader* reader) {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus CollectIterator::add_child(IData* data, RowBlock* block) {
+OLAPStatus CollectIterator::add_child(ColumnData* data, RowBlock* block) {
     std::unique_ptr<ChildCtx> child(new ChildCtx(data, block, _reader));
     RETURN_NOT_OK(child->init());
     if (child->current_row() == nullptr) {
@@ -478,7 +478,7 @@ void Reader::close() {
 }
 
 OLAPStatus Reader::_acquire_data_sources(const ReaderParams& read_params) {
-    const std::vector<IData*>* data_sources;
+    const std::vector<ColumnData*>* data_sources;
     if (read_params.reader_type == READER_ALTER_TABLE
             || read_params.reader_type == READER_BASE_COMPACTION
             || read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 83fca549..0eb6598b 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -57,8 +57,8 @@ struct ReaderParams {
     std::vector<OlapTuple> start_key;
     std::vector<OlapTuple> end_key;
     std::vector<TCondition> conditions;
-    // The IData will be set when using Merger, eg Cumulative, BE.
-    std::vector<IData*> olap_data_arr;
+    // The ColumnData will be set when using Merger, eg Cumulative, BE.
+    std::vector<ColumnData*> olap_data_arr;
     std::vector<uint32_t> return_columns;
     RuntimeProfile* profile;
     RuntimeState* runtime_state;
@@ -202,8 +202,8 @@ class Reader {
 
     // _own_data_sources is data source that reader aquire from olap_table, so we need to
     // release these when reader closing
-    std::vector<IData*> _own_data_sources;
-    std::vector<IData*> _data_sources;
+    std::vector<ColumnData*> _own_data_sources;
+    std::vector<ColumnData*> _data_sources;
 
     KeysParam _keys_param;
     int32_t _next_key_index;
diff --git a/be/src/olap/rowset.cpp b/be/src/olap/rowset.cpp
index 8aa173e4..5b9074ed 100644
--- a/be/src/olap/rowset.cpp
+++ b/be/src/olap/rowset.cpp
@@ -22,7 +22,7 @@
 #include <cmath>
 #include <fstream>
 
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_table.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
@@ -289,7 +289,7 @@ OLAPStatus Rowset::load() {
 OLAPStatus Rowset::load_pb(const char* file, uint32_t seg_id) {
     OLAPStatus res = OLAP_SUCCESS;
 
-    FileHeader<column_file::ColumnDataHeaderMessage> seg_file_header;
+    FileHeader<ColumnDataHeaderMessage> seg_file_header;
     FileHandler seg_file_handler;
     res = seg_file_handler.open(file, O_RDONLY);
     if (OLAP_SUCCESS != res) {
diff --git a/be/src/olap/rowset.h b/be/src/olap/rowset.h
index c24dadf5..9c8b82ce 100644
--- a/be/src/olap/rowset.h
+++ b/be/src/olap/rowset.h
@@ -222,7 +222,7 @@ class Rowset {
         return _index.get_row_block_position(pos, rbp);
     }
 
-    inline const FileHeader<column_file::ColumnDataHeaderMessage>* get_seg_pb(uint32_t seg_id) const {
+    inline const FileHeader<ColumnDataHeaderMessage>* get_seg_pb(uint32_t seg_id) const {
         return &(_seg_pb_map.at(seg_id));
     }
 
@@ -279,7 +279,7 @@ class Rowset {
     size_t _current_num_rows_per_row_block;
 
     std::vector<std::pair<WrapperField*, WrapperField*>> _column_statistics;
-    std::unordered_map<uint32_t, FileHeader<column_file::ColumnDataHeaderMessage> > _seg_pb_map;
+    std::unordered_map<uint32_t, FileHeader<ColumnDataHeaderMessage> > _seg_pb_map;
 
     DISALLOW_COPY_AND_ASSIGN(Rowset);
 };
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 84505523..0b49b594 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -17,16 +17,16 @@
 
 #pragma once
 
-#include "olap/writer.h"
+#include "olap/ata_writer.h"
 
 namespace doris {
 
 class Rowset;
 
-class RowsetBuilder : public IWriter {
+class RowsetBuilder : public ColumnDataWriter {
 public:
-    RowsetBuil(OLAPTablePtr table, Rowset* rowset, IWriter* writer, bool is_push_write)
-        : IWriter(is_push_write, table),
+    RowsetBuil(OLAPTablePtr table, Rowset* rowset, ColumnDataWriter* writer, bool is_push_write)
+        : ColumnDataWriter(is_push_write, table),
         _rowset(rowset),
         _writer(write) {
     }
@@ -55,11 +55,11 @@ class RowsetBuilder : public IWriter {
     }
 
     Rowset* rowset() { return _rowset; }
-    IWriter* writer() { return _writer; }
+    ColumnDataWriter* writer() { return _writer; }
 
 private:
     Rowset* _rowset;
-    IWriter* _writer;
+    ColumnDataWriter* _writer;
 };
 
 }
diff --git a/be/src/olap/column_file/run_length_byte_reader.cpp b/be/src/olap/run_length_byte_reader.cpp
similarity index 95%
rename from be/src/olap/column_file/run_length_byte_reader.cpp
rename to be/src/olap/run_length_byte_reader.cpp
index 52bb982c..bfd7e61c 100644
--- a/be/src/olap/column_file/run_length_byte_reader.cpp
+++ b/be/src/olap/run_length_byte_reader.cpp
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/run_length_byte_reader.h"
+#include "olap/run_length_byte_reader.h"
 
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/in_stream.h"
+#include "olap/column_reader.h"
+#include "olap/in_stream.h"
 
 namespace doris {
-namespace column_file {
 
 RunLengthByteReader::RunLengthByteReader(ReadOnlyFileStream* input) : 
         _input(input),
@@ -146,5 +145,4 @@ OLAPStatus RunLengthByteReader::skip(uint64_t num_values) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/run_length_byte_reader.h b/be/src/olap/run_length_byte_reader.h
similarity index 92%
rename from be/src/olap/column_file/run_length_byte_reader.h
rename to be/src/olap/run_length_byte_reader.h
index e9e96164..f20e2a34 100644
--- a/be/src/olap/column_file/run_length_byte_reader.h
+++ b/be/src/olap/run_length_byte_reader.h
@@ -18,12 +18,11 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_READER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_READER_H
 
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/run_length_byte_writer.h"
+#include "olap/file_stream.h"
+#include "olap/run_length_byte_writer.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class ReadOnlyFileStream;
 class PositionProvider;
@@ -53,7 +52,6 @@ class RunLengthByteReader {
     DISALLOW_COPY_AND_ASSIGN(RunLengthByteReader);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_READER_H
diff --git a/be/src/olap/column_file/run_length_byte_writer.cpp b/be/src/olap/run_length_byte_writer.cpp
similarity index 96%
rename from be/src/olap/column_file/run_length_byte_writer.cpp
rename to be/src/olap/run_length_byte_writer.cpp
index 6893ab7f..fed2fa41 100644
--- a/be/src/olap/column_file/run_length_byte_writer.cpp
+++ b/be/src/olap/run_length_byte_writer.cpp
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/run_length_byte_writer.h"
+#include "olap/run_length_byte_writer.h"
 
-#include "olap/column_file/out_stream.h"
+#include "olap/out_stream.h"
 
 namespace doris {
-namespace column_file {
 
 const int32_t RunLengthByteWriter::MIN_REPEAT_SIZE;
 const int32_t RunLengthByteWriter::MAX_LITERAL_SIZE;
@@ -147,6 +146,5 @@ void RunLengthByteWriter::get_position(PositionEntryWriter* index_entry) const {
     //                "recorded position count: %d", index_entry->positions_size());
 }
 
-}  // namespace column_file
 }  // namespace doris
 
diff --git a/be/src/olap/column_file/run_length_byte_writer.h b/be/src/olap/run_length_byte_writer.h
similarity index 95%
rename from be/src/olap/column_file/run_length_byte_writer.h
rename to be/src/olap/run_length_byte_writer.h
index 1bdc6173..6002ed65 100644
--- a/be/src/olap/column_file/run_length_byte_writer.h
+++ b/be/src/olap/run_length_byte_writer.h
@@ -18,11 +18,10 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_WRITER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_WRITER_H
 
-#include "olap/column_file/stream_index_writer.h"
+#include "olap/stream_index_writer.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class OutStream;
 class RowIndexEntryMessage;
@@ -52,7 +51,6 @@ class RunLengthByteWriter {
     DISALLOW_COPY_AND_ASSIGN(RunLengthByteWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_BYTE_WRITER_H
diff --git a/be/src/olap/column_file/run_length_integer_reader.cpp b/be/src/olap/run_length_integer_reader.cpp
similarity index 98%
rename from be/src/olap/column_file/run_length_integer_reader.cpp
rename to be/src/olap/run_length_integer_reader.cpp
index a99d1a90..0d1d000d 100644
--- a/be/src/olap/column_file/run_length_integer_reader.cpp
+++ b/be/src/olap/run_length_integer_reader.cpp
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/run_length_integer_reader.h"
+#include "olap/run_length_integer_reader.h"
 
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/serialize.h"
+#include "olap/column_reader.h"
+#include "olap/in_stream.h"
+#include "olap/serialize.h"
 
 namespace doris {
-namespace column_file {
 
 RunLengthIntegerReader::RunLengthIntegerReader(ReadOnlyFileStream* input, bool is_singed)
       : _input(input),
@@ -434,6 +433,5 @@ OLAPStatus RunLengthIntegerReader::skip(uint64_t num_values) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
 
diff --git a/be/src/olap/column_file/run_length_integer_reader.h b/be/src/olap/run_length_integer_reader.h
similarity index 92%
rename from be/src/olap/column_file/run_length_integer_reader.h
rename to be/src/olap/run_length_integer_reader.h
index 912534e2..1e16f8b9 100644
--- a/be/src/olap/column_file/run_length_integer_reader.h
+++ b/be/src/olap/run_length_integer_reader.h
@@ -18,14 +18,13 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_INTEGER_READER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_INTEGER_READER_H
 
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/run_length_integer_writer.h"
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/file_stream.h"
+#include "olap/run_length_integer_writer.h"
+#include "olap/stream_index_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
-namespace column_file {
 
 class ReadOnlyFileStream;
 class PositionProvider;
@@ -73,7 +72,6 @@ class RunLengthIntegerReader {
     DISALLOW_COPY_AND_ASSIGN(RunLengthIntegerReader);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_INTEGER_READER_H
diff --git a/be/src/olap/column_file/run_length_integer_writer.cpp b/be/src/olap/run_length_integer_writer.cpp
similarity index 99%
rename from be/src/olap/column_file/run_length_integer_writer.cpp
rename to be/src/olap/run_length_integer_writer.cpp
index 89f521c0..fca56b55 100644
--- a/be/src/olap/column_file/run_length_integer_writer.cpp
+++ b/be/src/olap/run_length_integer_writer.cpp
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/run_length_integer_writer.h"
+#include "olap/run_length_integer_writer.h"
 
 #include <cmath>
 
-#include "olap/column_file/out_stream.h"
-#include "olap/column_file/serialize.h"
+#include "olap/out_stream.h"
+#include "olap/serialize.h"
 
 namespace doris {
-namespace column_file {
 
 RunLengthIntegerWriter::RunLengthIntegerWriter(OutStream* output, bool is_singed) : 
         _output(output),
@@ -731,6 +730,5 @@ void RunLengthIntegerWriter::get_position(PositionEntryWriter* index_entry, bool
     }
 }
 
-}  // namespace column_file
 }  // namespace doris
 
diff --git a/be/src/olap/column_file/run_length_integer_writer.h b/be/src/olap/run_length_integer_writer.h
similarity index 99%
rename from be/src/olap/column_file/run_length_integer_writer.h
rename to be/src/olap/run_length_integer_writer.h
index 330c128c..78f7f618 100644
--- a/be/src/olap/column_file/run_length_integer_writer.h
+++ b/be/src/olap/run_length_integer_writer.h
@@ -20,12 +20,11 @@
 
 #include <endian.h>
 
-#include "olap/column_file/out_stream.h"
+#include "olap/out_stream.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class OutStream;
 class RowIndexEntryMessage;
@@ -295,7 +294,6 @@ class RunLengthIntegerWriter {
     DISALLOW_COPY_AND_ASSIGN(RunLengthIntegerWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_RUN_LENGTH_INTEGER_WRITER_H
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 17888c1c..f771f657 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -23,14 +23,14 @@
 #include <algorithm>
 #include <vector>
 
-#include "olap/i_data.h"
+#include "olap/column_data.h"
 #include "olap/merger.h"
-#include "olap/olap_data.h"
+#include "olap/column_data.h"
 #include "olap/olap_engine.h"
 #include "olap/olap_table.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 #include "olap/wrapper_field.h"
 #include "common/resource_tls.h"
 #include "agent/cgroups_mgr.h"
@@ -545,7 +545,7 @@ RowBlockMerger::~RowBlockMerger() {}
 
 bool RowBlockMerger::merge(
         const vector<RowBlock*>& row_block_arr,
-        IWriter* writer,
+        ColumnDataWriter* writer,
         uint64_t* merged_rows) {
     uint64_t tmp_merged_rows = 0;
     RowCursor row_cursor;
@@ -670,7 +670,7 @@ SchemaChangeDirectly::~SchemaChangeDirectly() {
     SAFE_DELETE(_dst_cursor);
 }
 
-bool SchemaChangeDirectly::_write_row_block(IWriter* writer, RowBlock* row_block) {
+bool SchemaChangeDirectly::_write_row_block(ColumnDataWriter* writer, RowBlock* row_block) {
     for (uint32_t i = 0; i < row_block->row_block_info().row_num; i++) {
         if (OLAP_SUCCESS != writer->attached_by(_dst_cursor)) {
             OLAP_LOG_WARNING("fail to attach writer");
@@ -686,7 +686,7 @@ bool SchemaChangeDirectly::_write_row_block(IWriter* writer, RowBlock* row_block
     return true;
 }
 
-bool LinkedSchemaChange::process(IData* olap_data, Rowset* new_rowset) {
+bool LinkedSchemaChange::process(ColumnData* olap_data, Rowset* new_rowset) {
     for (size_t i = 0; i < olap_data->olap_index()->num_segments(); ++i) {
         string index_path = new_rowset->construct_index_file_path(new_rowset->rowset_id(), i);
         string base_table_index_path = olap_data->olap_index()->construct_index_file_path(olap_data->olap_index()->rowset_id(), i);
@@ -728,7 +728,7 @@ bool LinkedSchemaChange::process(IData* olap_data, Rowset* new_rowset) {
     return true;
 }
 
-bool SchemaChangeDirectly::process(IData* olap_data, Rowset* new_rowset) {
+bool SchemaChangeDirectly::process(ColumnData* olap_data, Rowset* new_rowset) {
     DataFileType data_file_type = new_rowset->table()->data_file_type();
     bool null_supported = true;
 
@@ -802,7 +802,7 @@ bool SchemaChangeDirectly::process(IData* olap_data, Rowset* new_rowset) {
         << "block_row_size=" << _olap_table->num_rows_per_row_block();
     bool result = true;
     RowBlock* new_row_block = NULL;
-    IWriter* writer = IWriter::create(_olap_table, new_rowset, false);
+    ColumnDataWriter* writer = ColumnDataWriter::create(_olap_table, new_rowset, false);
     if (NULL == writer) {
         OLAP_LOG_WARNING("failed to create writer.");
         result = false;
@@ -917,7 +917,7 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
     SAFE_DELETE(_row_block_allocator);
 }
 
-bool SchemaChangeWithSorting::process(IData* olap_data, Rowset* new_rowset) {
+bool SchemaChangeWithSorting::process(ColumnData* olap_data, Rowset* new_rowset) {
     if (NULL == _row_block_allocator) {
         if (NULL == (_row_block_allocator = new(nothrow) RowBlockAllocator(
                         _olap_table->tablet_schema(), _memory_limitation))) {
@@ -1120,7 +1120,7 @@ bool SchemaChangeWithSorting::process(IData* olap_data, Rowset* new_rowset) {
 bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_block_arr,
                                                 const Version& temp_delta_versions,
                                                 Rowset** temp_rowset) {
-    IWriter* writer = NULL;
+    ColumnDataWriter* writer = NULL;
     uint64_t merged_rows = 0;
     RowBlockMerger merger(_olap_table);
 
@@ -1138,7 +1138,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_blo
                    _olap_table->full_name().c_str(),
                    _olap_table->num_rows_per_row_block());
 
-    writer = IWriter::create(_olap_table, *temp_rowset, false);
+    writer = ColumnDataWriter::create(_olap_table, *temp_rowset, false);
     if (NULL == writer) {
         OLAP_LOG_WARNING("failed to create writer.");
         goto INTERNAL_SORTING_ERR;
@@ -1173,13 +1173,13 @@ bool SchemaChangeWithSorting::_external_sorting(
 
     uint64_t merged_rows = 0;
     uint64_t filted_rows = 0;
-    vector<IData*> olap_data_arr;
+    vector<ColumnData*> olap_data_arr;
 
     for (vector<Rowset*>::iterator it = src_rowsets.begin();
             it != src_rowsets.end(); ++it) {
-        IData* olap_data = IData::create(*it);
+        ColumnData* olap_data = ColumnData::create(*it);
         if (NULL == olap_data) {
-            OLAP_LOG_WARNING("fail to create IData.");
+            OLAP_LOG_WARNING("fail to create ColumnData.");
             goto EXTERNAL_SORTING_ERR;
         }
 
@@ -1212,7 +1212,7 @@ bool SchemaChangeWithSorting::_external_sorting(
         goto EXTERNAL_SORTING_ERR;
     }
 
-    for (vector<IData*>::iterator it = olap_data_arr.begin();
+    for (vector<ColumnData*>::iterator it = olap_data_arr.begin();
             it != olap_data_arr.end(); ++it) {
         SAFE_DELETE(*it);
     }
@@ -1220,7 +1220,7 @@ bool SchemaChangeWithSorting::_external_sorting(
     return true;
 
 EXTERNAL_SORTING_ERR:
-    for (vector<IData*>::iterator it = olap_data_arr.begin();
+    for (vector<ColumnData*>::iterator it = olap_data_arr.begin();
             it != olap_data_arr.end(); ++it) {
         SAFE_DELETE(*it);
     }
@@ -1536,7 +1536,7 @@ OLAPStatus SchemaChangeHandler::_do_alter_table(
     }
 
     vector<Version> versions_to_be_changed;
-    vector<IData*> olap_data_arr;
+    vector<ColumnData*> olap_data_arr;
     // delete handlers for new olap table
     DeleteHandler delete_handler;
     do {
@@ -1798,12 +1798,12 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
     }
 
     // c. 转换数据
-    IData* olap_data = NULL;
+    ColumnData* olap_data = NULL;
     for (vector<Rowset*>::iterator it = ref_rowsets->begin();
             it != ref_rowsets->end(); ++it) {
-        IData* olap_data = IData::create(*it);
+        ColumnData* olap_data = ColumnData::create(*it);
         if (NULL == olap_data) {
-            OLAP_LOG_WARNING("fail to create IData.");
+            OLAP_LOG_WARNING("fail to create ColumnData.");
             res = OLAP_ERR_MALLOC_ERROR;
             goto SCHEMA_VERSION_CONVERT_ERR;
         }
@@ -2020,7 +2020,7 @@ OLAPStatus SchemaChangeHandler::_alter_table(SchemaChangeParams* sc_params) {
     }
 
     // c. 转换历史数据
-    for (vector<IData*>::iterator it = sc_params->ref_olap_data_arr.end() - 1;
+    for (vector<ColumnData*>::iterator it = sc_params->ref_olap_data_arr.end() - 1;
             it >= sc_params->ref_olap_data_arr.begin(); --it) {
         OLAP_LOG_TRACE("begin to convert a history delta. [version='%d-%d']",
                        (*it)->version().first, (*it)->version().second);
@@ -2037,7 +2037,7 @@ OLAPStatus SchemaChangeHandler::_alter_table(SchemaChangeParams* sc_params) {
                 sc_params->ref_olap_table->schema_hash(),
                 (*it)->version().second);
 
-        // we create a new delta with the same version as the IData processing currently.
+        // we create a new delta with the same version as the ColumnData processing currently.
         Rowset* new_rowset = new(nothrow) Rowset(
                                             sc_params->new_olap_table.get(),
                                             (*it)->version(),
@@ -2159,8 +2159,8 @@ OLAPStatus SchemaChangeHandler::_alter_table(SchemaChangeParams* sc_params) {
                        (*it)->version().first,
                        (*it)->version().second);
 
-        // 释放IData
-        vector<IData*> olap_data_to_be_released(it, it + 1);
+        // 释放ColumnData
+        vector<ColumnData*> olap_data_to_be_released(it, it + 1);
         sc_params->ref_olap_table->release_data_sources(&olap_data_to_be_released);
 
         it = sc_params->ref_olap_data_arr.erase(it); // after erasing, it will point to end()
@@ -2389,7 +2389,7 @@ OLAPStatus SchemaChange::create_init_version(
                    version.first, version.second);
 
     OLAPTablePtr table;
-    IWriter* writer = NULL;
+    ColumnDataWriter* writer = NULL;
     OLAPStatus res = OLAP_SUCCESS;
 
     do {
@@ -2409,7 +2409,7 @@ OLAPStatus SchemaChange::create_init_version(
         }
 
         // Create writer, which write nothing to table, to generate empty data file
-        writer = IWriter::create(table, rowset, false);
+        writer = ColumnDataWriter::create(table, rowset, false);
         if (writer == NULL) {
             LOG(WARNING) << "fail to create writer. [table=" << table->full_name() << "]";
             res = OLAP_ERR_MALLOC_ERROR;
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index b3f3a598..7fded321 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -24,14 +24,14 @@
 
 #include "gen_cpp/AgentService_types.h"
 #include "olap/delete_handler.h"
-#include "olap/i_data.h"
+#include "olap/column_data.h"
 
 namespace doris {
 // defined in 'field.h'
 class Field;
 class FieldInfo;
 // defined in 'olap_data.h'
-class IData;
+class ColumnData;
 // defined in 'olap_table.h'
 class OLAPTable;
 // defined in 'row_block.h'
@@ -39,7 +39,7 @@ class RowBlock;
 // defined in 'row_cursor.h'
 class RowCursor;
 // defined in 'writer.h'
-class IWriter;
+class ColumnDataWriter;
 
 struct ColumnMapping {
     ColumnMapping() : ref_column(-1), default_value(NULL) {}
@@ -124,7 +124,7 @@ class RowBlockMerger {
 
     bool merge(
             const std::vector<RowBlock*>& row_block_arr,
-            IWriter* writer,
+            ColumnDataWriter* writer,
             uint64_t* merged_rows);
 
 private:
@@ -150,7 +150,7 @@ class SchemaChange {
     SchemaChange() : _filted_rows(0), _merged_rows(0) {}
     virtual ~SchemaChange() {}
 
-    virtual bool process(IData* olap_data, Rowset* new_olap_index) = 0;
+    virtual bool process(ColumnData* olap_data, Rowset* new_olap_index) = 0;
 
     void add_filted_rows(uint64_t filted_rows) {
         _filted_rows += filted_rows;
@@ -195,7 +195,7 @@ class LinkedSchemaChange : public SchemaChange {
                 OLAPTablePtr new_olap_table);
     ~LinkedSchemaChange() {}
 
-    bool process(IData* olap_data, Rowset* new_olap_index);
+    bool process(ColumnData* olap_data, Rowset* new_olap_index);
 private:
     OLAPTablePtr _base_olap_table;
     OLAPTablePtr _new_olap_table;
@@ -212,7 +212,7 @@ class SchemaChangeDirectly : public SchemaChange {
             const RowBlockChanger& row_block_changer);
     virtual ~SchemaChangeDirectly();
 
-    virtual bool process(IData* olap_data, Rowset* new_olap_index);
+    virtual bool process(ColumnData* olap_data, Rowset* new_olap_index);
 
 private:
     OLAPTablePtr _olap_table;
@@ -221,7 +221,7 @@ class SchemaChangeDirectly : public SchemaChange {
     RowCursor* _src_cursor;
     RowCursor* _dst_cursor;
 
-    bool _write_row_block(IWriter* writer, RowBlock* row_block);
+    bool _write_row_block(ColumnDataWriter* writer, RowBlock* row_block);
 
     DISALLOW_COPY_AND_ASSIGN(SchemaChangeDirectly);
 };
@@ -235,7 +235,7 @@ class SchemaChangeWithSorting : public SchemaChange {
             size_t memory_limitation);
     virtual ~SchemaChangeWithSorting();
 
-    virtual bool process(IData* olap_data, Rowset* new_olap_index);
+    virtual bool process(ColumnData* olap_data, Rowset* new_olap_index);
 
 private:
     bool _internal_sorting(
@@ -317,7 +317,7 @@ class SchemaChangeHandler {
         AlterTabletType alter_table_type;
         OLAPTablePtr ref_olap_table;
         OLAPTablePtr new_olap_table;
-        std::vector<IData*> ref_olap_data_arr;
+        std::vector<ColumnData*> ref_olap_data_arr;
         std::string debug_message;
         DeleteHandler delete_handler;
         // TODO(zc): fuck me please, I don't add mutable here, but no where
diff --git a/be/src/olap/column_file/segment_reader.cpp b/be/src/olap/segment_reader.cpp
similarity index 99%
rename from be/src/olap/column_file/segment_reader.cpp
rename to be/src/olap/segment_reader.cpp
index 7c699aa8..8357ae42 100644
--- a/be/src/olap/column_file/segment_reader.cpp
+++ b/be/src/olap/segment_reader.cpp
@@ -15,21 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/segment_reader.h"
+#include "olap/segment_reader.h"
 
 #include <sys/mman.h>
 
 #include <istream>
 
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/out_stream.h"
+#include "olap/file_stream.h"
+#include "olap/in_stream.h"
+#include "olap/out_stream.h"
 #include "olap/olap_cond.h"
 #include "olap/row_block.h"
 #include "olap/rowset.h"
 
 namespace doris {
-namespace column_file {
 
 static const uint32_t MIN_FILTER_BLOCK_NUM = 10;
 
@@ -121,7 +120,7 @@ OLAPStatus SegmentReader::_check_file_version() {
     }
 
     if (_header_message().version() > CURRENT_COLUMN_DATA_VERSION) {
-        OLAP_LOG_WARNING("this file may generated by olapengine of higher version. "
+        OLAP_LOG_WARNING("this file may generated by olap/ngine of higher version. "
                 "reading it would cause some unexpected error, [found version = %d]",
                 _header_message().version());
     }
@@ -146,7 +145,7 @@ OLAPStatus SegmentReader::_load_segment_file() {
 
     res = _check_file_version();
     if (OLAP_SUCCESS != res) {
-        OLAP_LOG_WARNING("file header corrupted or generated by higher version olapengine.");
+        OLAP_LOG_WARNING("file header corrupted or generated by higher version olap/ngine.");
         return res;
     }
 
@@ -952,5 +951,4 @@ OLAPStatus SegmentReader::_load_to_vectorized_row_batch(
     return OLAP_SUCCESS;
 }
 
-}  // namespace column_file
 }  //unamespace doris
diff --git a/be/src/olap/column_file/segment_reader.h b/be/src/olap/segment_reader.h
similarity index 97%
rename from be/src/olap/column_file/segment_reader.h
rename to be/src/olap/segment_reader.h
index d02ed305..9c382c77 100644
--- a/be/src/olap/column_file/segment_reader.h
+++ b/be/src/olap/segment_reader.h
@@ -25,12 +25,12 @@
 #include <map>
 #include <string>
 
-#include "olap/column_file/bloom_filter_reader.h"
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/compress.h"
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/bloom_filter_reader.h"
+#include "olap/column_reader.h"
+#include "olap/compress.h"
+#include "olap/file_stream.h"
+#include "olap/in_stream.h"
+#include "olap/stream_index_reader.h"
 #include "olap/delete_handler.h"
 #include "olap/file_helper.h"
 #include "olap/lru_cache.h"
@@ -48,7 +48,6 @@ namespace doris {
 
 class Rowset;
 
-namespace column_file {
 
 class ColumnReader;
 
@@ -363,7 +362,6 @@ class SegmentReader {
     DISALLOW_COPY_AND_ASSIGN(SegmentReader);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_SEGMENT_READER_H
diff --git a/be/src/olap/column_file/segment_writer.cpp b/be/src/olap/segment_writer.cpp
similarity index 98%
rename from be/src/olap/column_file/segment_writer.cpp
rename to be/src/olap/segment_writer.cpp
index c4bcc246..3dae9690 100644
--- a/be/src/olap/column_file/segment_writer.cpp
+++ b/be/src/olap/segment_writer.cpp
@@ -15,16 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/segment_writer.h"
+#include "olap/segment_writer.h"
 
-#include "olap/column_file/column_writer.h"
-#include "olap/column_file/out_stream.h"
+#include "olap/column_writer.h"
+#include "olap/out_stream.h"
 #include "olap/file_helper.h"
 #include "olap/utils.h"
 
 
 namespace doris {
-namespace column_file {
 
 SegmentWriter::SegmentWriter(
         const std::string& file_name,
@@ -276,5 +275,4 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/segment_writer.h b/be/src/olap/segment_writer.h
similarity index 94%
rename from be/src/olap/column_file/segment_writer.h
rename to be/src/olap/segment_writer.h
index 7c7967bd..c6303888 100644
--- a/be/src/olap/column_file/segment_writer.h
+++ b/be/src/olap/segment_writer.h
@@ -19,16 +19,14 @@
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_SEGMENT_WRITER_H
 
 #include "olap/olap_define.h"
-#include "olap/writer.h"
+#include "olap/data_writer.h"
 
 namespace doris {
-namespace column_file {
 
 class ColumnWriter;
 class OutStreamFactory;
 class ColumnDataHeaderMessage;
 
-// 列文件格式的Writer,接口参考IWriter中的定义
 class SegmentWriter {
 public:
     explicit SegmentWriter(const std::string& file_name,
@@ -60,7 +58,6 @@ class SegmentWriter {
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_SEGMENT_WRITER_H
diff --git a/be/src/olap/column_file/serialize.cpp b/be/src/olap/serialize.cpp
similarity index 98%
rename from be/src/olap/column_file/serialize.cpp
rename to be/src/olap/serialize.cpp
index c4c91fa1..87f4beb5 100644
--- a/be/src/olap/column_file/serialize.cpp
+++ b/be/src/olap/serialize.cpp
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/serialize.h"
+#include "olap/serialize.h"
 
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/out_stream.h"
+#include "olap/file_stream.h"
+#include "olap/out_stream.h"
 
 namespace doris {
-namespace column_file {
 namespace ser {
 
 OLAPStatus write_var_unsigned(OutStream* stream, int64_t value) {
@@ -277,6 +276,5 @@ OLAPStatus read_ints(ReadOnlyFileStream* input, int64_t* data, uint32_t count, u
 }
 
 } // namespace ser
-} // namespace column_file
 } // namespace doris
 
diff --git a/be/src/olap/column_file/serialize.h b/be/src/olap/serialize.h
similarity index 98%
rename from be/src/olap/column_file/serialize.h
rename to be/src/olap/serialize.h
index 357ee837..7b09a400 100644
--- a/be/src/olap/column_file/serialize.h
+++ b/be/src/olap/serialize.h
@@ -19,10 +19,9 @@
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_SERIALIZE_H
 
 #include "olap/olap_define.h"
-#include "olap/column_file/byte_buffer.h"
+#include "olap/byte_buffer.h"
 
 namespace doris {
-namespace column_file {
 
 class OutStream;
 class ReadOnlyFileStream;
@@ -154,7 +153,6 @@ inline bool is_safe_subtract(int64_t left, int64_t right) {
 }
 
 } // namespace ser
-} // namespace column_file
 } // namespace doris
 
 #endif
diff --git a/be/src/olap/store.cpp b/be/src/olap/store.cpp
index 1bf989f1..1bc33753 100755
--- a/be/src/olap/store.cpp
+++ b/be/src/olap/store.cpp
@@ -508,8 +508,8 @@ OLAPStatus OlapStore::_load_table_from_header(OLAPEngine* engine, TTabletId tabl
     }
 
     if (olap_table->lastest_version() == nullptr && !olap_table->is_schema_changing()) {
-        LOG(WARNING) << "tablet not in schema change state without delta is invalid. tablet:"
-            << olap_table->full_name();
+        LOG(WARNING) << "tablet not in schema change state without delta is invalid."
+                     << "tablet=" << olap_table->full_name();
         // tablet state is invalid, drop tablet
         olap_table->mark_dropped();
         return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
@@ -543,17 +543,74 @@ OLAPStatus OlapStore::_load_table_from_header(OLAPEngine* engine, TTabletId tabl
 }
 
 OLAPStatus OlapStore::load_tables(OLAPEngine* engine) {
-    auto load_table_func = [this, engine](long tablet_id,
-            long schema_hash, const std::string& value) -> bool {
+    auto traverse_header_func = [this, engine](const std::string& key, const std::string& value) -> bool {
+        std::vector<std::string> parts;
+        // key format: "hdr_" + tablet_id + "_" + schema_hash
+        split_string<char>(key, '_', &parts);
+        if (parts.size() != 3) {
+            LOG(WARNING) << "invalid header key:" << key << ", splitted size:" << parts.size();
+            return true;
+        }
+        TTabletId tablet_id = std::stol(parts[1].c_str(), NULL, 10);
+        TSchemaHash schema_hash = std::stol(parts[2].c_str(), NULL, 10);
         OLAPStatus status = _load_table_from_header(engine, tablet_id, schema_hash, value);
         if (status != OLAP_SUCCESS) {
-            LOG(WARNING) << "load table from header failed.tablet_id:" << tablet_id
-                    << ", schema_hash:" << schema_hash << ", status:" << status;
+            LOG(WARNING) << "load table from header failed. status:" << status
+                         << "tablet=" << tablet_id << "." << schema_hash;
         };
         return true;
     };
-    OLAPStatus status = OlapHeaderManager::traverse_headers(_meta, load_table_func);
+    OLAPStatus status = OlapHeaderManager::traverse_headers(_meta, traverse_header_func);
     return status;
 }
 
+OLAPStatus OlapStore::check_none_row_oriented_table_in_store(OLAPEngine* engine) {
+    auto traverse_header_func = [this, engine](const std::string& key, const std::string& value) -> bool {
+        std::vector<std::string> parts;
+        // key format: "hdr_" + tablet_id + "_" + schema_hash
+        split_string<char>(key, '_', &parts);
+        if (parts.size() != 3) {
+            LOG(WARNING) << "invalid header key:" << key << ", splitted size:" << parts.size();
+            return true;
+        }
+        TTabletId tablet_id = std::stol(parts[1].c_str(), NULL, 10);
+        TSchemaHash schema_hash = std::stol(parts[2].c_str(), NULL, 10);
+        OLAPStatus status = _check_none_row_oriented_table_in_store(engine, tablet_id, schema_hash, value);
+        if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "load table from header failed. status:" << status
+                         << "tablet=" << tablet_id << "." << schema_hash;
+        };
+        return true;
+    };
+    OLAPStatus status = OlapHeaderManager::traverse_headers(_meta, traverse_header_func);
+    return status;
+}
+
+OLAPStatus OlapStore::_check_none_row_oriented_table_in_store(
+                        OLAPEngine* engine, TTabletId tablet_id,
+                        TSchemaHash schema_hash, const std::string& header) {
+    std::unique_ptr<OLAPHeader> olap_header(new OLAPHeader());
+    bool parsed = olap_header->ParseFromString(header);
+    if (!parsed) {
+        LOG(WARNING) << "parse header string failed for tablet_id:" << tablet_id << " schema_hash:" << schema_hash;
+        return OLAP_ERR_HEADER_PB_PARSE_FAILED;
+    }
+    // init must be called
+    RETURN_NOT_OK(olap_header->init());
+    OLAPTablePtr olap_table =
+        OLAPTable::create_from_header(olap_header.release());
+    if (olap_table == nullptr) {
+        LOG(WARNING) << "fail to new table. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash;
+        return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
+    }
+
+    LOG(INFO) << "data_file_type:" << olap_table->data_file_type();
+    if (olap_table->data_file_type() == OLAP_DATA_FILE) {
+        LOG(FATAL) << "Not support row-oriented table any more. Please convert it to column-oriented table."
+                   << "tablet=" << olap_table->full_name();
+    }
+
+    return OLAP_SUCCESS;
 }
+
+} // namespace doris
diff --git a/be/src/olap/store.h b/be/src/olap/store.h
index 11326d4c..392da377 100644
--- a/be/src/olap/store.h
+++ b/be/src/olap/store.h
@@ -79,6 +79,10 @@ class OlapStore {
     static std::string get_root_path_from_schema_hash_path_in_trash(const std::string& schema_hash_dir_in_trash);
 
     OLAPStatus load_tables(OLAPEngine* engine);
+    OLAPStatus check_none_row_oriented_table_in_store(OLAPEngine* engine);
+    OLAPStatus _check_none_row_oriented_table_in_store(
+                        OLAPEngine* engine, TTabletId tablet_id,
+                        TSchemaHash schema_hash, const std::string& header);
 
 private:
     std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
diff --git a/be/src/olap/column_file/stream_index_common.cpp b/be/src/olap/stream_index_common.cpp
similarity index 95%
rename from be/src/olap/column_file/stream_index_common.cpp
rename to be/src/olap/stream_index_common.cpp
index 7dc218ee..9290faa6 100755
--- a/be/src/olap/column_file/stream_index_common.cpp
+++ b/be/src/olap/stream_index_common.cpp
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/stream_index_common.h"
+#include "olap/stream_index_common.h"
 
-#include "olap/column_file/stream_index_common.h"
+#include "olap/stream_index_common.h"
 #include "olap/field.h"
 #include "olap/wrapper_field.h"
 
 namespace doris {
-namespace column_file {
 
 ColumnStatistics::ColumnStatistics() : 
         _minimum(NULL),
@@ -127,5 +126,4 @@ OLAPStatus ColumnStatistics::write_to_buffer(char* buffer, size_t size) {
     return OLAP_SUCCESS;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/stream_index_common.h b/be/src/olap/stream_index_common.h
similarity index 98%
rename from be/src/olap/column_file/stream_index_common.h
rename to be/src/olap/stream_index_common.h
index fa35c1c0..c4a3be8c 100755
--- a/be/src/olap/column_file/stream_index_common.h
+++ b/be/src/olap/stream_index_common.h
@@ -26,7 +26,6 @@
 
 namespace doris {
 
-namespace column_file {
 
 // 描述streamindex的格式
 struct StreamIndexHeader {
@@ -98,7 +97,6 @@ class ColumnStatistics {
     bool _null_supported;
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_INDEX_COMMON_H
diff --git a/be/src/olap/column_file/stream_index_reader.cpp b/be/src/olap/stream_index_reader.cpp
similarity index 97%
rename from be/src/olap/column_file/stream_index_reader.cpp
rename to be/src/olap/stream_index_reader.cpp
index a47663bb..e3d3cdf7 100755
--- a/be/src/olap/column_file/stream_index_reader.cpp
+++ b/be/src/olap/stream_index_reader.cpp
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/stream_index_reader.h"
 
 namespace doris {
-namespace column_file {
 
 PositionEntryReader::PositionEntryReader() :
         _positions(NULL),
@@ -149,5 +148,4 @@ OLAPStatus StreamIndexReader::_parse_header(FieldType type) {
     return res;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/stream_index_reader.h b/be/src/olap/stream_index_reader.h
similarity index 96%
rename from be/src/olap/column_file/stream_index_reader.h
rename to be/src/olap/stream_index_reader.h
index 381f4cce..22ab8039 100755
--- a/be/src/olap/column_file/stream_index_reader.h
+++ b/be/src/olap/stream_index_reader.h
@@ -18,11 +18,10 @@
 #ifndef DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_INDEX_READER_H
 #define DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_INDEX_READER_H
 
-#include "olap/column_file/stream_index_common.h"
+#include "olap/stream_index_common.h"
 #include "olap/olap_define.h"
 
 namespace doris {
-namespace column_file {
 
 class PositionEntryReader {
 public:
@@ -98,6 +97,5 @@ class StreamIndexReader {
     PositionEntryReader _entry;
 };
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_INDEX_READER_H
diff --git a/be/src/olap/column_file/stream_index_writer.cpp b/be/src/olap/stream_index_writer.cpp
similarity index 98%
rename from be/src/olap/column_file/stream_index_writer.cpp
rename to be/src/olap/stream_index_writer.cpp
index f54bc1e6..4231a6ca 100755
--- a/be/src/olap/column_file/stream_index_writer.cpp
+++ b/be/src/olap/stream_index_writer.cpp
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/stream_index_writer.h"
+#include "olap/stream_index_writer.h"
 
 #include <vector>
 
 namespace doris {
-namespace column_file {
 
 PositionEntryWriter::PositionEntryWriter() : _positions_count(0), _statistics_size(0) {
     memset(_statistics_buffer, 0, sizeof(_statistics_buffer));
@@ -181,5 +180,4 @@ OLAPStatus StreamIndexWriter::write_to_buffer(char* buffer, size_t buffer_size)
     return OLAP_SUCCESS;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/stream_index_writer.h b/be/src/olap/stream_index_writer.h
similarity index 95%
rename from be/src/olap/column_file/stream_index_writer.h
rename to be/src/olap/stream_index_writer.h
index 5219a500..ec289e02 100755
--- a/be/src/olap/column_file/stream_index_writer.h
+++ b/be/src/olap/stream_index_writer.h
@@ -20,11 +20,10 @@
 
 #include <vector>
 
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/stream_index_common.h"
+#include "olap/file_stream.h"
+#include "olap/stream_index_common.h"
 
 namespace doris {
-namespace column_file {
 
 // 目前写入和读取分离了,主要是因为写入用了一个定长的buffer
 // 不过实际上可以合并
@@ -76,6 +75,5 @@ class StreamIndexWriter {
     FieldType _field_type;
 };
 
-}  // namespace column_file
 }  // namespace doris
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_INDEX_WRITER_H
diff --git a/be/src/olap/column_file/stream_name.cpp b/be/src/olap/stream_name.cpp
similarity index 94%
rename from be/src/olap/column_file/stream_name.cpp
rename to be/src/olap/stream_name.cpp
index 543f8953..ada53f31 100755
--- a/be/src/olap/column_file/stream_name.cpp
+++ b/be/src/olap/stream_name.cpp
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/stream_name.h"
+#include "olap/stream_name.h"
 
 namespace doris {
-namespace column_file {
 
 StreamName::StreamName(uint32_t unique_column_id, StreamInfoMessage::Kind kind) : 
         _unique_column_id(unique_column_id),
@@ -47,5 +46,4 @@ bool StreamName::operator == (const StreamName& another) const {
            _kind == another._kind;
 }
 
-}  // namespace column_file
 }  // namespace doris
diff --git a/be/src/olap/column_file/stream_name.h b/be/src/olap/stream_name.h
similarity index 97%
rename from be/src/olap/column_file/stream_name.h
rename to be/src/olap/stream_name.h
index e78bd01e..1e9c3cb6 100755
--- a/be/src/olap/column_file/stream_name.h
+++ b/be/src/olap/stream_name.h
@@ -21,7 +21,6 @@
 #include <gen_cpp/column_data_file.pb.h>
 
 namespace doris {
-namespace column_file {
 
 // 定义流的名字,是流的唯一标识符
 // 实现比较函数,将流在文件中的顺序进行约定:
@@ -45,7 +44,6 @@ class StreamName {
     StreamInfoMessage::Kind _kind;
 };
 
-}  // namespace column_file
 }  // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_STREAM_NAME_H
diff --git a/be/src/olap/writer.cpp b/be/src/olap/writer.cpp
deleted file mode 100644
index 347a9042..00000000
--- a/be/src/olap/writer.cpp
+++ /dev/null
@@ -1,265 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/writer.h"
-
-#include "olap/column_file/data_writer.h"
-#include "olap/olap_data.h"
-#include "olap/olap_index.h"
-#include "olap/olap_table.h"
-#include "olap/row_block.h"
-#include "olap/row_cursor.h"
-
-namespace doris {
-
-IWriter* IWriter::create(OLAPTablePtr table, Rowset *index, bool is_push_write) {
-    IWriter* writer = NULL;
-
-    switch (table->data_file_type()) {
-    case OLAP_DATA_FILE:
-        writer = new (std::nothrow) OLAPDataWriter(table, index, is_push_write);
-        break;
-    case COLUMN_ORIENTED_FILE:
-        writer = new (std::nothrow) column_file::ColumnDataWriter(table, index, is_push_write);
-        break;
-    default:
-        OLAP_LOG_WARNING("unknown data file type. [type=%s]",
-                         DataFileType_Name(table->data_file_type()).c_str());
-        break;
-    }
-
-    return writer;
-}
-
-OLAPDataWriter::OLAPDataWriter(OLAPTablePtr table, Rowset* index, bool is_push_write) : 
-        IWriter(is_push_write, table),
-        _index(index),
-        _data(NULL),
-        _current_segment_size(0),
-        _max_segment_size(OLAP_MAX_SEGMENT_FILE_SIZE),
-        _row_block(NULL),
-        _num_rows(0),
-        _is_push_write(is_push_write) {}
-
-OLAPDataWriter::~OLAPDataWriter() {
-    SAFE_DELETE(_row_block);
-    SAFE_DELETE(_data);
-}
-
-OLAPStatus OLAPDataWriter::init() {
-    return init(_table->num_rows_per_row_block());
-}
-
-OLAPStatus OLAPDataWriter::init(uint32_t num_rows_per_row_block) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    res = IWriter::init();
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to init res. [res=%d]", res);
-        return res;
-    }
-    
-    if (_table->segment_size() < _max_segment_size) {
-        _max_segment_size = _table->segment_size();
-    }
-
-    _data = new (std::nothrow) OLAPData(_index);
-    if (NULL == _data) {
-        LOG(WARNING) << "fail to new OLAPData. [table='" << _table->full_name() << "']";
-        return OLAP_ERR_MALLOC_ERROR;
-    }
-
-    if (OLAP_SUCCESS != (res = _data->init())) {
-        OLAP_LOG_WARNING("fail to initiate OLAPData. [table='%s' res=%d]",
-                         _table->full_name().c_str(),
-                         res);
-        return res;
-    }
-
-    _row_block = new (std::nothrow) RowBlock(_table->tablet_schema());
-    if (NULL == _row_block) {
-        LOG(WARNING) << "fail to new RowBlock. [table='" << _table->full_name() << "']";
-        return OLAP_ERR_MALLOC_ERROR;
-    }
-
-    OLAP_LOG_DEBUG("init OLAPData writer. [table='%s' block_row_size=%lu]",
-                   _table->full_name().c_str(),
-                   _table->num_rows_per_row_block());
-    
-    RowBlockInfo block_info(0U, num_rows_per_row_block, 0);
-    block_info.data_file_type = OLAP_DATA_FILE;
-    block_info.null_supported = true;
-    if (OLAP_SUCCESS != (res = _row_block->init(block_info))) {
-        OLAP_LOG_WARNING("fail to initiate row block. [res=%d]", res);
-        return res;
-    }
-
-    if (OLAP_SUCCESS != (res = _data->add_segment())) {
-        OLAP_LOG_WARNING("fail to add data segment. [res=%d]", res);
-        return res;
-    }
-
-    if (OLAP_SUCCESS != (res = _index->add_segment())) {
-        OLAP_LOG_WARNING("fail to add index segment. [res=%d]", res);
-        return res;
-    }
-
-    if (_is_push_write) {
-        _write_mbytes_per_sec = config::push_write_mbytes_per_sec;
-    } else {
-        _write_mbytes_per_sec = config::base_compaction_write_mbytes_per_sec;
-    }
-    
-    _speed_limit_watch.reset();
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPDataWriter::attached_by(RowCursor* row_cursor) {
-    if (_row_index >= _table->num_rows_per_row_block()) {
-        if (OLAP_SUCCESS != _flush_row_block()) {
-            OLAP_LOG_WARNING("failed to flush data while attaching row cursor.");
-            return OLAP_ERR_OTHER_ERROR;
-        }
-        RETURN_NOT_OK(_flush_segment_with_verfication());
-    }
-    // Row points to the memory that needs to write in _row_block.
-    _row_block->get_row(_row_index, row_cursor);
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPDataWriter::write(const char* row) {
-    if (_row_index >= _table->num_rows_per_row_block()) {
-        if (OLAP_SUCCESS != _flush_row_block()) {
-            OLAP_LOG_WARNING("failed to flush data while attaching row cursor.");
-            return OLAP_ERR_OTHER_ERROR;
-        }
-        RETURN_NOT_OK(_flush_segment_with_verfication());
-    }
-    _row_block->set_row(_row_index, row);
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPDataWriter::_flush_row_block() {
-    if (_row_index < 1) {
-        return OLAP_SUCCESS;
-    }
-
-    if (OLAP_SUCCESS != _row_block->finalize(_row_index)) {
-        OLAP_LOG_WARNING("fail to finalize row block. [num_rows=%u]", _row_index);
-        return OLAP_ERR_WRITER_ROW_BLOCK_ERROR;
-    }
-
-    // Write a ready row block into OLAPData.
-    // Add one index item into Rowset.
-    // Add row block into olap data.
-    uint32_t start_offset;
-    uint32_t end_offset;
-    if (OLAP_SUCCESS != _data->add_row_block(_row_block,
-                                             &start_offset,
-                                             &end_offset)) {
-        OLAP_LOG_WARNING("fail to write data.");
-        return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-    }
-
-    // Add the corresponding index item into olap index.
-    if (OLAP_SUCCESS != _index->add_row_block(*_row_block, start_offset)) {
-        OLAP_LOG_WARNING("fail to update index.");
-        return OLAP_ERR_WRITER_INDEX_WRITE_ERROR;
-    }
-
-    _current_segment_size = end_offset;
-    _num_rows += _row_block->row_block_info().row_num;
-
-    // In order to reuse row_block, clear the row_block after finalize
-    _row_block->clear();
-    _row_index = 0U;
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus OLAPDataWriter::_flush_segment_with_verfication() {
-    if (UNLIKELY(_current_segment_size < _max_segment_size)) {
-        return OLAP_SUCCESS;
-    }
-    uint32_t data_segment_size;
-    if (OLAP_SUCCESS != _data->finalize_segment(&data_segment_size)) {
-        OLAP_LOG_WARNING("fail to finish segment from olap_data.");
-        return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-    }
-
-    if (OLAP_SUCCESS != _index->finalize_segment(data_segment_size, _num_rows)) {
-        OLAP_LOG_WARNING("fail to finish segment from olap_index.");
-        return OLAP_ERR_WRITER_INDEX_WRITE_ERROR;
-    }
-
-    if (OLAP_SUCCESS != _data->add_segment()
-        || OLAP_SUCCESS != _index->add_segment()) {
-        OLAP_LOG_WARNING("fail to add data or index segment.");
-        return OLAP_ERR_OTHER_ERROR;
-    }
-
-    _num_rows = 0;
-    _current_segment_size = 0U;
-    return OLAP_SUCCESS;
-}
-
-void OLAPDataWriter::sync() {
-    _data->sync();
-    _index->sync();
-}
-
-// Finalize may be success in spite of write() failure.
-OLAPStatus OLAPDataWriter::finalize() {
-    // Write the last row block into OLAPData
-    if (OLAP_SUCCESS != _flush_row_block()) {
-        OLAP_LOG_WARNING("fail to flush row block.");
-        return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-    }
-
-    // Finalize data and index segment.
-    uint32_t data_segment_size;
-    if (OLAP_SUCCESS != _data->finalize_segment(&data_segment_size)) {
-        OLAP_LOG_WARNING("fail to finish segment from olap_data.");
-        return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-    }
-
-    if (OLAP_SUCCESS != _index->finalize_segment(data_segment_size, _num_rows)) {
-        OLAP_LOG_WARNING("fail to finish segment from olap_index.");
-        return OLAP_ERR_WRITER_INDEX_WRITE_ERROR;
-    }
-
-    OLAPStatus res = _index->add_column_statistics(_column_statistics);
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("Fail to set delta pruning![res=%d]", res);
-        return res;
-    }
-
-    _num_rows = 0;
-    _current_segment_size = 0U;
-
-    return OLAP_SUCCESS;
-}
-
-uint64_t OLAPDataWriter::written_bytes() {
-    return _current_segment_size + _index->num_segments() * _max_segment_size;
-}
-
-MemPool* OLAPDataWriter::mem_pool() {
-    return _row_block->mem_pool();
-}
-
-}  // namespace doris
diff --git a/be/src/olap/writer.h b/be/src/olap/writer.h
deleted file mode 100644
index 84dfedcd..00000000
--- a/be/src/olap/writer.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_OLAP_WRITER_H
-#define DORIS_BE_SRC_OLAP_WRITER_H
-
-#include "olap/olap_table.h"
-#include "olap/schema.h"
-#include "olap/wrapper_field.h"
-
-namespace doris {
-class OLAPData;
-class Rowset;
-class OLAPTable;
-class RowBlock;
-class RowCursor;
-
-// 抽象一个接口,接口只暴露row级别的写, 不暴露block级别接口
-// 尽量和原接口兼容
-// 先attach出内部指针,再填入数据的方式并不可取,但兼容的考虑先继续使用
-class IWriter {
-public:
-    IWriter(bool is_push_write, OLAPTablePtr table) : 
-            _is_push_write(is_push_write), 
-            _table(table),
-            _column_statistics(
-                _table->num_key_fields(), std::pair<WrapperField*, WrapperField*>(NULL, NULL)),
-            _row_index(0) {}
-    virtual ~IWriter() {
-        for (size_t i = 0; i < _column_statistics.size(); ++i) {
-            SAFE_DELETE(_column_statistics[i].first);
-            SAFE_DELETE(_column_statistics[i].second);
-        } 
-    }
-    virtual OLAPStatus init() {
-        for (size_t i = 0; i < _column_statistics.size(); ++i) {
-            _column_statistics[i].first = WrapperField::create(_table->tablet_schema()[i]);
-            DCHECK(_column_statistics[i].first != nullptr) << "fail to create column statistics field.";
-            _column_statistics[i].first->set_to_max();
-
-            _column_statistics[i].second = WrapperField::create(_table->tablet_schema()[i]);
-            DCHECK(_column_statistics[i].second != nullptr) << "fail to create column statistics field.";
-            _column_statistics[i].second->set_null();
-            _column_statistics[i].second->set_to_min();
-        }
-        return OLAP_SUCCESS;
-    }
-    virtual OLAPStatus attached_by(RowCursor* row_cursor) = 0;
-    void next(const RowCursor& row_cursor) {
-        for (size_t i = 0; i < _table->num_key_fields(); ++i) {
-            char* right = row_cursor.get_field_by_index(i)->get_field_ptr(row_cursor.get_buf());
-            if (_column_statistics[i].first->cmp(right) > 0) {
-                _column_statistics[i].first->copy(right);
-            }
-
-            if (_column_statistics[i].second->cmp(right) < 0) {
-                _column_statistics[i].second->copy(right);
-            }
-        }
-
-        ++_row_index;
-    }
-    void next(const char* row, const Schema* schema) {
-        for (size_t i = 0; i < _table->num_key_fields(); ++i) {
-            char* right = const_cast<char*>(row + schema->get_col_offset(i));
-            if (_column_statistics[i].first->cmp(right) > 0) {
-                _column_statistics[i].first->copy(right);
-            }
-
-            if (_column_statistics[i].second->cmp(right) < 0) {
-                _column_statistics[i].second->copy(right);
-            }
-        }
-
-        ++_row_index;
-    }
-    virtual OLAPStatus write(const char* row) = 0;
-    virtual OLAPStatus finalize() = 0;
-    virtual uint64_t written_bytes() = 0;
-    virtual MemPool* mem_pool() = 0;
-    // Factory function
-    // 调用者获得新建的对象, 并负责delete释放
-    static IWriter* create(OLAPTablePtr table, Rowset* index, bool is_push_write);
-protected:
-    bool _is_push_write;
-    OLAPTablePtr _table;
-    // first is min, second is max
-    std::vector<std::pair<WrapperField*, WrapperField*>> _column_statistics;
-    uint32_t _row_index;
-};
-
-// OLAPDataWriter writes rows into a new version, including data and indexes files.
-// OLAPDataWriter does not take Rowset ownership.
-// Common usage is:
-// 1.     index = new Rowset(table, new_version...)
-// 2.     OLAPDataWriter writer(table, index)
-// 3.     writer.init()
-// ===========================================
-// 4.     loop:
-//          make row block...
-//          writer.flush_row_block(row_block)
-// OR----------------------------------------
-// 4.     RowCursor* cursor
-//        loop:
-//          writer.attached_by(cursor)
-//          reader.read(cursor)
-//          writer.next()
-// ===========================================
-// 5.     writer.finalize()
-// 6.     if errors happen in flush_row_block() or finalize()
-//          index->delete_all_files()
-//          delete index
-//
-// 7.     index->load()
-// 8.     we use index now ...
-class OLAPDataWriter : public IWriter {
-public:
-    OLAPDataWriter(OLAPTablePtr table, Rowset* index, bool is_push_write = false);
-
-    virtual ~OLAPDataWriter();
-
-    virtual OLAPStatus init();
-
-    // Init with custom num rows of row block
-    OLAPStatus init(uint32_t num_rows_per_row_block);
-
-    // In order to avoid memory copy while reading and writing, attach the
-    // row_cursor to the row block being written.
-    // If the number of rows reached maximum, the row_block will be added into
-    // OLAPData, and one index item will be added into Rowset.
-    virtual OLAPStatus attached_by(RowCursor* row_cursor);
-    virtual OLAPStatus write(const char* row);
-
-    // sync data to disk, ignore error
-    void sync();
-
-    // call finalize function after calling writes in spite of failue in writes.
-    virtual OLAPStatus finalize();
-
-    virtual uint64_t written_bytes();
-    virtual MemPool* mem_pool();
-private:
-    // Flush the row block written before to Rowset
-    OLAPStatus _flush_row_block();
-    OLAPStatus _flush_segment_with_verfication();
-
-    Rowset* _index;
-    OLAPData* _data;
-    // current OLAPData Segment size, it is used to prevent OLAPData Segment
-    // size exceeding _max_segment_size(OLAP_MAX_SEGMENT_FILE_SIZE)
-    uint32_t _current_segment_size;
-    uint32_t _max_segment_size;  // default it is OLAP_MAX_SEGMENT_FILE_SIZE
-    RowBlock* _row_block;
-    int64_t _num_rows;
-
-    // write limit
-    bool _is_push_write;
-    uint32_t _write_mbytes_per_sec;
-    OlapStopWatch _speed_limit_watch;
-
-    DISALLOW_COPY_AND_ASSIGN(OLAPDataWriter);
-};
-
-}  // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_WRITER_H
diff --git a/be/src/tools/meta_tool.cpp b/be/src/tools/meta_tool.cpp
index 39b51ca9..0901c514 100644
--- a/be/src/tools/meta_tool.cpp
+++ b/be/src/tools/meta_tool.cpp
@@ -127,8 +127,17 @@ int main(int argc, char** argv) {
         }
         std::cout << "delete header successfully" << std::endl;
     } else if (FLAGS_operation == "rollback") {
-        auto rollback_func = [&root_path](long tablet_id,
-                long schema_hash, const std::string& value) -> bool {
+        auto rollback_func = [&root_path](const std::string& key,
+                                          const std::string& value) -> bool {
+            std::vector<std::string> parts;
+            // key format: "hdr_" + tablet_id + "_" + schema_hash
+            doris::split_string<char>(key, '_', &parts);
+            if (parts.size() != 3) {
+                LOG(WARNING) << "invalid header key:" << key << ", splitted size:" << parts.size();
+                return true;
+            }
+            int64_t tablet_id = std::stol(parts[1].c_str(), NULL, 10);
+            int64_t schema_hash = std::stol(parts[2].c_str(), NULL, 10);
             OLAPHeader olap_header;
             bool parsed = olap_header.ParseFromString(value);
             if (!parsed) {
diff --git a/be/test/olap/bit_field_test.cpp b/be/test/olap/bit_field_test.cpp
index 507de392..941f624f 100755
--- a/be/test/olap/bit_field_test.cpp
+++ b/be/test/olap/bit_field_test.cpp
@@ -17,15 +17,14 @@
 
 #include <gtest/gtest.h>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/out_stream.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/bit_field_reader.h"
-#include "olap/column_file/bit_field_writer.h"
+#include "olap/byte_buffer.h"
+#include "olap/out_stream.h"
+#include "olap/in_stream.h"
+#include "olap/bit_field_reader.h"
+#include "olap/bit_field_writer.h"
 #include "util/logging.h"
 
 namespace doris {
-namespace column_file {
 
 class TestBitField : public testing::Test {
 public:
@@ -182,7 +181,6 @@ TEST_F(TestBitField, Skip) {
     ASSERT_EQ(value, 1);
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/test/olap/bloom_filter_index_test.cpp b/be/test/olap/bloom_filter_index_test.cpp
index f09545e7..a2b651d4 100644
--- a/be/test/olap/bloom_filter_index_test.cpp
+++ b/be/test/olap/bloom_filter_index_test.cpp
@@ -19,14 +19,13 @@
 
 #include <string>
 
-#include "olap/column_file/bloom_filter_reader.h"
-#include "olap/column_file/bloom_filter_writer.h"
+#include "olap/bloom_filter_reader.h"
+#include "olap/bloom_filter_writer.h"
 #include "util/logging.h"
 
 using std::string;
 
 namespace doris {
-namespace column_file {
 
 class TestBloomFilterIndex : public testing::Test {
 public:
@@ -103,7 +102,6 @@ TEST_F(TestBloomFilterIndex, abnormal_read) {
             reader.init(buffer, buffer_size, true, hash_function_num, bit_num));
 }
 
-} // namespace column_file
 } // namespace doris
 
 int main(int argc, char **argv) {
diff --git a/be/test/olap/bloom_filter_test.cpp b/be/test/olap/bloom_filter_test.cpp
index f16c5a01..42e689b9 100644
--- a/be/test/olap/bloom_filter_test.cpp
+++ b/be/test/olap/bloom_filter_test.cpp
@@ -19,13 +19,12 @@
 
 #include <string>
 
-#include "olap/column_file/bloom_filter.hpp"
+#include "olap/bloom_filter.hpp"
 #include "util/logging.h"
 
 using std::string;
 
 namespace doris {
-namespace column_file {
 
 class TestBloomFilter : public testing::Test {
 public:
@@ -153,7 +152,6 @@ TEST_F(TestBloomFilter, bloom_filter_info) {
     LOG(WARNING) << "bytes=" << bytes << " points=" << points;
 }
 
-} // namespace column_file
 } // namespace doris
 
 int main(int argc, char **argv) {
diff --git a/be/test/olap/byte_buffer_test.cpp b/be/test/olap/byte_buffer_test.cpp
index fe2a4017..7ec4ae05 100755
--- a/be/test/olap/byte_buffer_test.cpp
+++ b/be/test/olap/byte_buffer_test.cpp
@@ -18,12 +18,11 @@
 #include <gtest/gtest.h>
 #include <sys/mman.h>
 
-#include "olap/column_file/byte_buffer.h"
+#include "olap/byte_buffer.h"
 #include "olap/file_helper.h"
 #include "util/logging.h"
 
 namespace doris {
-namespace column_file {
 
 class TestByteBuffer : public testing::Test {
 public:
@@ -183,7 +182,6 @@ TEST_F(TestByteBuffer, TestMmap) {
     }
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/test/olap/column_reader_test.cpp b/be/test/olap/column_reader_test.cpp
index 20e88f71..a1145f43 100644
--- a/be/test/olap/column_reader_test.cpp
+++ b/be/test/olap/column_reader_test.cpp
@@ -17,10 +17,10 @@
 
 #include <gtest/gtest.h>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/stream_name.h"
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/column_writer.h"
+#include "olap/byte_buffer.h"
+#include "olap/stream_name.h"
+#include "olap/column_reader.h"
+#include "olap/column_writer.h"
 #include "olap/field.h"
 #include "olap/olap_define.h"
 #include "olap/olap_common.h"
@@ -33,7 +33,6 @@
 using std::string;
 
 namespace doris {
-namespace column_file {
 
 class TestColumn : public testing::Test {
 public:
@@ -3071,7 +3070,6 @@ TEST_F(TestColumn, VectorizedDirectVarcharColumnWith65533) {
     }   
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/test/olap/run_length_byte_test.cpp b/be/test/olap/run_length_byte_test.cpp
index f98f8acf..db352bd3 100755
--- a/be/test/olap/run_length_byte_test.cpp
+++ b/be/test/olap/run_length_byte_test.cpp
@@ -17,19 +17,18 @@
 
 #include <gtest/gtest.h>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/out_stream.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/file_stream.h"
-#include "olap/column_file/run_length_byte_writer.h"
-#include "olap/column_file/run_length_byte_reader.h"
-#include "olap/column_file/column_reader.h"
-#include "olap/column_file/stream_index_reader.h"
-#include "olap/column_file/stream_index_writer.h"
+#include "olap/byte_buffer.h"
+#include "olap/out_stream.h"
+#include "olap/in_stream.h"
+#include "olap/file_stream.h"
+#include "olap/run_length_byte_writer.h"
+#include "olap/run_length_byte_reader.h"
+#include "olap/column_reader.h"
+#include "olap/stream_index_reader.h"
+#include "olap/stream_index_writer.h"
 #include "util/logging.h"
 
 namespace doris {
-namespace column_file {
 
 using namespace testing;
 
@@ -845,7 +844,6 @@ TEST_F(TestRunLengthByte, Skip) {
     ASSERT_EQ(value, 0x5e);
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/test/olap/run_length_integer_test.cpp b/be/test/olap/run_length_integer_test.cpp
index eac94349..2920e516 100755
--- a/be/test/olap/run_length_integer_test.cpp
+++ b/be/test/olap/run_length_integer_test.cpp
@@ -17,17 +17,16 @@
 
 #include <gtest/gtest.h>
 
-#include "olap/column_file/byte_buffer.h"
-#include "olap/column_file/out_stream.h"
-#include "olap/column_file/in_stream.h"
-#include "olap/column_file/run_length_integer_writer.h"
-#include "olap/column_file/run_length_integer_reader.h"
-#include "olap/column_file/stream_index_writer.h"
-#include "olap/column_file/stream_index_reader.h"
+#include "olap/byte_buffer.h"
+#include "olap/out_stream.h"
+#include "olap/in_stream.h"
+#include "olap/run_length_integer_writer.h"
+#include "olap/run_length_integer_reader.h"
+#include "olap/stream_index_writer.h"
+#include "olap/stream_index_reader.h"
 #include "util/logging.h"
 
 namespace doris {
-namespace column_file {
 
 class TestRunLengthUnsignInteger : public testing::Test {
 public:
@@ -839,7 +838,6 @@ TEST_F(TestRunLengthSignInteger, DirectEncodingForDeltaOverflows2) {
    
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/test/olap/serialize_test.cpp b/be/test/olap/serialize_test.cpp
index 1cbc03a1..8bdb7b2b 100644
--- a/be/test/olap/serialize_test.cpp
+++ b/be/test/olap/serialize_test.cpp
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/column_file/serialize.h"
+#include "olap/serialize.h"
 
 #include <gtest/gtest.h>
 
 namespace doris {
-namespace column_file {
 namespace ser {
 
 class SerializeTest : public testing::Test {
@@ -233,7 +232,6 @@ TEST_F(SerializeTest, new_percentile_bits) {
 }
 
 
-}
 }
 }
 
diff --git a/be/test/olap/stream_index_test.cpp b/be/test/olap/stream_index_test.cpp
index e7927e28..ead34f83 100755
--- a/be/test/olap/stream_index_test.cpp
+++ b/be/test/olap/stream_index_test.cpp
@@ -26,16 +26,15 @@
 #include "olap/olap_common.h"
 #include "olap/row_cursor.h"
 #include "olap/wrapper_field.h"
-#include "olap/column_file/stream_index_common.h"
-#include "olap/column_file/stream_index_writer.h"
-#include "olap/column_file/stream_index_reader.h"
-#include "olap/column_file/file_stream.h"
+#include "olap/stream_index_common.h"
+#include "olap/stream_index_writer.h"
+#include "olap/stream_index_reader.h"
+#include "olap/file_stream.h"
 #include "util/logging.h"
 
 using namespace std;
 
 namespace doris {
-namespace column_file {
 
 class TestStreamIndex : public testing::Test {
 public:
@@ -383,7 +382,6 @@ TEST_F(TestStreamIndex, statistic) {
     }
 }
 
-}
 }
 
 int main(int argc, char** argv) {
diff --git a/gensrc/proto/column_data_file.proto b/gensrc/proto/column_data_file.proto
index 88904742..e6e35835 100644
--- a/gensrc/proto/column_data_file.proto
+++ b/gensrc/proto/column_data_file.proto
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package doris.column_file;
+package doris;
 
 import "olap_common.proto";
 
diff --git a/gensrc/script/gen_build_version.sh b/gensrc/script/gen_build_version.sh
index 0337e79b..82cf00e2 100755
--- a/gensrc/script/gen_build_version.sh
+++ b/gensrc/script/gen_build_version.sh
@@ -150,7 +150,7 @@ namespace doris {
 #define PALO_BUILD_TIME    "${build_time}"
 #define PALO_BUILD_INFO    "${build_info}"
 
-} // namespace palo
+} // namespace doris
 
 #endif
 EOF
diff --git a/tools/row_to_column/README b/tools/row_to_column/README
new file mode 100644
index 00000000..34a53f8f
--- /dev/null
+++ b/tools/row_to_column/README
@@ -0,0 +1,11 @@
+行存转列存步骤
+1. conf中修改相应的集群配置
+2. 运行python convert_row_to_column.py生成所有行存转列存的命令。
+3. 如果脚本运行没有任何结果,说明集群中不存在行存表,可以进行升级。
+4. 如果脚本中有表的信息获取失败。
+   请在集群中用show create table去看表的storage_type,如果是行存,请手动进行转换。
+5. schema change完成行存转列存之后,旧有的schema需要在10分钟后才删除。
+   请等待一段时间,确定旧有schema的数据已被删除,再进行升级。
+   否则,be进程启动时自动检测时会报错退出。
+6. 再次运行python convert_row_to_column.py验证确实没有行存表再进行升级。
+
diff --git a/tools/row_to_column/conf b/tools/row_to_column/conf
new file mode 100644
index 00000000..efde1cd2
--- /dev/null
+++ b/tools/row_to_column/conf
@@ -0,0 +1,6 @@
+[cluster]
+fe_host = 
+port = 
+http_port =
+username = 
+password = 
diff --git a/tools/row_to_column/convert_row_to_column.py b/tools/row_to_column/convert_row_to_column.py
new file mode 100644
index 00000000..b0738c1d
--- /dev/null
+++ b/tools/row_to_column/convert_row_to_column.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ConfigParser
+import json
+import os 
+import re
+import sys
+import time
+from urllib import urlopen
+
+import MySQLdb
+
+class convert_row_to_column(object):
+    def connect(self, host, port, http_port, username, password):
+        """
+        Use MySQLdb to connect to PALO 
+        """
+        self.host = host
+        self.port = port
+        self.http_port = http_port
+        self.username = username
+        self.passwd = password
+        try:
+            self.db = MySQLdb.connect(host=self.host, port=self.port, 
+                                      user=self.username,
+                                      passwd=self.passwd)
+            self.cur = self.db.cursor()
+        except MySQLdb.Error as e:
+            print ("error %s:%s" % (str(e.args[0]), e.args[1]))
+    
+    def close(self):
+        if self.db.open:
+            self.cur.close()
+            self.db.close()
+
+    def run(self):
+        url_list = "http://%s:%s@%s:%s/api/_get_ddl?db=default_cluster" % (
+                        self.username, self.passwd, self.host, self.http_port)
+
+        url = None
+        show_databases_sql = "show databases"
+        self.cur.execute(show_databases_sql)
+        databases = self.cur.fetchall()
+        for database_tuple in databases :
+        #for database in ["habo_db", "tieba_recommend"]:
+            database = database_tuple[0]
+            show_tables_sql = "show tables from `" + database + "`"
+            self.cur.execute(show_tables_sql)
+            for table_tuple in self.cur:
+                table = table_tuple[0]
+                url = "%s:%s&tbl=%s" % (url_list, database, table)
+                try:
+                    doc = urlopen(url).read();
+                    doc = json.loads(doc)
+                except Exception as err:
+                    print "url: %s, error: %s" % (url, err)
+                    continue
+                create_table_stmt = doc["TABLE"]
+                ddl = create_table_stmt[0].encode("utf-8")
+                if ddl.find("\"storage_type\" = \"ROW\"") != -1 :
+                    table = re.search('CREATE TABLE `(.*)`', ddl).group(1)
+                    print "alter table " + database + "." + table + " set(\"storage_type\"=\"column\");"
+
+def main():
+    cf = ConfigParser.ConfigParser()
+    cf.read("./conf")
+    host = cf.get('cluster', 'fe_host')
+    port = int(cf.get('cluster', 'port'))
+    http_port = int(cf.get('cluster', 'http_port'))
+    user = cf.get('cluster', 'username')
+    passwd = cf.get('cluster', 'password')
+
+    converter = convert_row_to_column()
+    converter.connect(host, port, http_port, user, passwd)
+    converter.run();
+    converter.close()
+
+if __name__ == '__main__':
+    main()
+
diff --git a/tools/row_to_column/get_ddl_stmt.py b/tools/row_to_column/get_ddl_stmt.py
new file mode 100644
index 00000000..2a0cb1d7
--- /dev/null
+++ b/tools/row_to_column/get_ddl_stmt.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# -*- coding: utf-8 -*-
+import ConfigParser
+import re
+import sys
+import os
+import json
+from urllib import urlopen
+
+config = ConfigParser.ConfigParser()
+config.read("conf")
+username = config.get("cluster", "username")
+passwd = config.get("cluster", "password")
+fe_host = config.get("cluster", "fe_host")
+fe_http_port = config.get("cluster", "http_port")
+db=sys.argv[1]
+table_name=sys.argv[2]
+
+tbls=[]
+f_table=open(table_name, "r")
+line=f_table.readline()
+while line:
+    tbls.append(line)
+    line=f_table.readline()
+f_table.close()
+fw_create=open("create.sql", "w")
+
+for tbl in tbls:
+    url_list=[]
+    url_list.append("http://")
+    url_list.append(username)
+    url_list.append(":")
+    url_list.append(passwd)
+    url_list.append("@")
+    url_list.append(fe_host)
+    url_list.append(":")
+    url_list.append(fe_http_port)
+    url_list.append("/api/_get_ddl?db=default_cluster:")
+    url_list.append(db)
+    url_list.append("&tbl=")
+    url_list.append(tbl)
+    url = "".join(url_list)
+
+    doc = urlopen(url).read();
+    doc = json.loads(doc)
+
+    create_table_stmt = doc["TABLE"]
+    ddl = create_table_stmt[0].encode("utf-8")
+    print ddl
+
+fw_create.close()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@doris.apache.org
For additional commands, e-mail: dev-help@doris.apache.org