You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/05/05 01:31:43 UTC

[incubator-doris] branch master updated: [fix][doc]add design doc (#9324)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a578b02a5 [fix][doc]add design doc (#9324)
4a578b02a5 is described below

commit 4a578b02a5d7272a15ec23a0f1f98db3e376b9a6
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu May 5 09:31:37 2022 +0800

    [fix][doc]add design doc (#9324)
    
    add design doc
---
 docs/.vuepress/sidebar/en.js                      |  12 +-
 docs/.vuepress/sidebar/zh-CN.js                   |  11 +
 docs/en/design/Flink doris connector Design.md    | 259 +++++++++++
 docs/en/design/doris_storage_optimization.md      | 235 ++++++++++
 docs/en/design/grouping_sets_design.md            | 501 +++++++++++++++++++++
 docs/en/design/metadata-design.md                 | 127 ++++++
 docs/zh-CN/design/doris_storage_optimization.md   | 234 ++++++++++
 docs/zh-CN/design/flink_doris_connector_design.md | 272 ++++++++++++
 docs/zh-CN/design/grouping_sets_design.md         | 517 ++++++++++++++++++++++
 docs/zh-CN/design/metadata-design.md              | 126 ++++++
 docs/zh-CN/design/spark_load.md                   | 212 +++++++++
 11 files changed, 2505 insertions(+), 1 deletion(-)

diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 9a599bc224..0170060b47 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -967,7 +967,17 @@ module.exports = [
       "tpc-h"
     ],
   },
-    {
+  {
+    title: "Design Documents",
+    directoryPath: "design/",
+    initialOpenGroupIndex: -1,
+    children: [
+      "doris_storage_optimization",
+      "grouping_sets_design",
+      "metadata-design",
+    ],
+  },  
+  {
     title: "Doris User",
     directoryPath: "case-user/",
     initialOpenGroupIndex: -1,
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index f2396b94d1..365f41e9c5 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -967,6 +967,17 @@ module.exports = [
       "tpc-h"
     ],
   },
+  {
+    title: "设计文档",
+    directoryPath: "design/",
+    initialOpenGroupIndex: -1,
+    children: [
+      "doris_storage_optimization",
+      "grouping_sets_design",
+      "metadata-design",
+      "spark_load",
+    ],
+  },  
   {
     title: "Doris用户",
     directoryPath: "case-user/",
diff --git a/docs/en/design/Flink doris connector Design.md b/docs/en/design/Flink doris connector Design.md
new file mode 100644
index 0000000000..05481c67bf
--- /dev/null
+++ b/docs/en/design/Flink doris connector Design.md	
@@ -0,0 +1,259 @@
+---
+{
+    "title": "Flink doris connector Design",
+    "language": "en"
+}
+
+
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under 
+
+# Flink doris connector Design
+
+
+
+First of all, thanks to the author of the community Spark Doris Connector
+
+From the perspective of Doris, by introducing its data into Flink, Flink can use a series of rich ecological products, which broadens the imagination of the product and also makes it possible to query Doris and other data sources jointly.
+
+Starting from our business architecture and business needs, we chose Flink as part of our architecture, the ETL and real-time computing framework for data. The community currently supports Spark doris connector, so we designed and developed Flink doris Connector with reference to Spark doris connector.
+
+##Technical Choice
+
+When the model was originally selected, it was the same as the Spark Doris connector, so we started to consider the JDBC method, but, as described in the Spark doris connector article, this method has advantages, but the disadvantages are more obvious. Later, we read and tested the Spark code and decided to implement it on the shoulders of giants (note: copy the code and modify it directly).
+
+The following content is from the Spark Doris Connector blog, directly copied
+
+```
+Therefore, we developed a new data source Spark-Doris-Connector for Doris. Under this scheme, Doris can publish Doris data and distribute it to Spark. The Spark driver accesses Doris's FE to obtain the Doris table architecture and basic data distribution. After that, according to this data distribution, the data query task is reasonably allocated to the executors. Finally, Spark's execution program accesses different BEs for querying. Greatly improve query efficiency
+```
+
+## 1. Instructions
+
+Compile and generate doris-flink-1.0.0-SNAPSHOT.jar in the extension/flink-doris-connector/ directory of the Doris code base, add this jar package to the ClausPath of flink, and then you can use Flink-on -Doris function
+
+## 2. how to use
+
+Compile and generate doris-flink-1.0.0-SNAPSHOT.jar in the extension/flink-doris-connector/ directory of the Doris code library, add this jar package to the ClassPath of flink, and then use the Flink-on-Doris function
+
+#### 2.1 SQL way
+
+Support function:
+
+1. Supports reading data in Doris data warehouse tables through Flink SQL to Flink for calculations
+2. Support inserting data into the corresponding table of the data warehouse through Flink SQL. The back-end implementation is to communicate directly with BE through Stream Load to complete the data insertion operation
+3. You can use Flink to associate non-Doris external data source tables for association analysis
+
+example:
+
+
+
+```java
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation01 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation02 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation_01',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+
+        tEnv.executeSql("INSERT INTO test_aggregation02 select * from test_aggregation01");
+        tEnv.executeSql("select count(1) from test_aggregation01");
+```
+
+#### 2.2 DataStream way:
+
+```java
+DorisOptions.Builder options = DorisOptions.builder()
+                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+                .setUsername("$YOUR_DORIS_USERNAME")
+                .setPassword("$YOUR_DORIS_PASSWORD")
+                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
+env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
+```
+
+## 3. Applicable scene
+
+![1616987965864](/images/Flink-doris-connector.png)
+
+#### 3.1. Use Flink to perform joint analysis on data in Doris and other data sources
+
+Many business departments place their data on different storage systems, such as some online analysis and report data in Doris, some structured retrieval data in Elasticsearch, and some data used for transaction processing in MySQL, and so on. It is often necessary to analyze the business across multiple storage sources. After connecting Flink and Doris through the Flink Doris connector, companies can directly use Flink to perform joint query calculations on the data in Doris and multipl [...]
+
+#### 3.2 Real-time data access
+
+Before Flink Doris Connector: For business irregular data, it is usually necessary to perform standardized processing on messages, and write null value filtering into new topics, and then start regular loading to write Doris.
+
+![1616988281677](/images/Flink-doris-connector1.png)
+
+After Flink Doris Connector: flink reads kafka and writes doris directly.
+
+![1616988514873](/images/Flink-doris-connector2.png)
+
+## 4.Technical realization
+
+### 4.1 Architecture diagram
+
+![1616997396610](/images/Flink-doris-connector-architecture.png)
+
+### 4.2 Doris provides more external capabilities
+
+#### 4.2.1 Doris FE
+
+The interface for obtaining metadata information of internal tables, single-table query planning and some statistical information has been opened to the outside world.
+
+All Rest API interfaces require HttpBasic authentication. The user name and password are the user name and password for logging in to the database. Pay attention to the correct assignment of permissions.
+
+```
+// Get table schema meta information
+GET api/{database}/{table}/_schema
+
+// Get the query plan template for a single table
+POST api/{database}/{table}/_query_plan
+{
+"sql": "select k1, k2 from {database}.{table}"
+}
+
+// Get the table size
+GET api/{database}/{table}/_count
+```
+
+#### 4.2.2 Doris BE
+
+Through the Thrift protocol, data filtering, scanning and cropping capabilities are directly provided to the outside world.
+
+```
+service TDorisExternalService {
+     // Initialize the query executor
+TScanOpenResult open_scanner(1: TScanOpenParams params);
+
+// Streaming batch to get data, Apache Arrow data format
+     TScanBatchResult get_next(1: TScanNextBatchParams params);
+
+// end scan
+     TScanCloseResult close_scanner(1: TScanCloseParams params);
+}
+```
+
+For definitions of Thrift related structures, please refer to:
+
+https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
+
+### 4.3 Implement DataStream
+
+Inherit org.apache.flink.streaming.api.functions.source.RichSourceFunction and customize DorisSourceFunction. During initialization, get the execution plan of the related table and get the corresponding partition.
+
+Rewrite the run method to read data from the partition in a loop.
+
+```
+public void run(SourceContext sourceContext){
+       //Cycle through the partitions
+        for(PartitionDefinition partitions : dorisPartitions){
+            scalaValueReader = new ScalaValueReader(partitions, settings);
+            while (scalaValueReader.hasNext()){
+                Object next = scalaValueReader.next();
+                sourceContext.collect(next);
+            }
+        }
+}
+```
+
+### 4.4 Implement Flink SQL on Doris
+
+Refer to [Flink Custom Source&Sink](https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/sourceSinks.html) and Flink-jdbc-connector to implement the following As a result, Flink SQL can be used to directly manipulate Doris tables, including reading and writing.
+
+#### 4.4.1 Implementation details
+
+1. Realize DynamicTableSourceFactory and DynamicTableSinkFactory register doris connector
+2. Customize DynamicTableSource and DynamicTableSink to generate logical plans
+3. After DorisRowDataInputFormat and DorisDynamicOutputFormat obtain the logical plan, start execution
+
+![1616747472136](/images/table_connectors.svg)
+
+The most important implementation is DorisRowDataInputFormat and DorisDynamicOutputFormat customized based on RichInputFormat and RichOutputFormat.
+
+In DorisRowDataInputFormat, the obtained dorisPartitions are divided into multiple shards in createInputSplits for parallel computing.
+
+```java
+public DorisTableInputSplit[] createInputSplits(int minNumSplits) {
+		List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
+		int splitNum = 0;
+		for (PartitionDefinition partition : dorisPartitions) {
+			dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
+		}
+		return dorisSplits.toArray(new DorisTableInputSplit[0]);
+}
+
+public RowData nextRecord(RowData reuse)  {
+		if (!hasNext) {
+            //After reading the data, return null
+			return null;
+		}
+		List next = (List)scalaValueReader.next();
+		GenericRowData genericRowData = new GenericRowData(next.size());
+		for(int i =0;i<next.size();i++){
+			genericRowData.setField(i, next.get(i));
+		}
+		//Determine if there is still data
+		hasNext = scalaValueReader.hasNext();
+		return genericRowData;
+}
+```
+
+In DorisRowDataOutputFormat, write data to doris through streamload. Refer to org.apache.doris.plugin.audit.DorisStreamLoader for streamload program
+
+```java
+public  void writeRecord(RowData row) throws IOException {
+       //streamload Default delimiter \t
+        StringJoiner value = new StringJoiner("\t");
+        GenericRowData rowData = (GenericRowData) row;
+        for(int i = 0; i < row.getArity(); ++i) {
+            value.add(rowData.getField(i).toString());
+        }
+        //streamload write data
+        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());
+        System.out.println(loadResponse);
+}
+```
+
diff --git a/docs/en/design/doris_storage_optimization.md b/docs/en/design/doris_storage_optimization.md
new file mode 100644
index 0000000000..4cfb586627
--- /dev/null
+++ b/docs/en/design/doris_storage_optimization.md
@@ -0,0 +1,235 @@
+---
+{
+    "title": "Doris Storage File Format Optimization",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Doris Storage File Format Optimization #
+
+## File format ##
+
+![](/images/segment_v2.png)
+<center>1. doris segment</center>
+
+Documents include:
+- The file starts with an 8-byte magic code to identify the file format and version
+- Data Region: Used to store data information for each column, where the data is loaded on demand by pages.
+- Index Region: Doris stores the index data of each column in Index Region, where the data is loaded according to column granularity, so the data information of the following column is stored separately.
+- Footer
+	- FileFooterPB: Metadata Information for Definition Files
+	- Checksum of 4 bytes of footer Pb content
+	- Four bytes FileFooterPB message length for reading FileFooterPB
+	- The 8 byte MAGIC CODE is stored in the last bit to facilitate the identification of file types in different scenarios.
+
+The data in the file is organized in the form of page, which is the basic unit of coding and compression. Current page types include the following:
+
+### DataPage ###
+
+Data Page is divided into two types: nullable and non-nullable data pages.
+
+Nullable's data page includes:
+```
+
+                 +----------------+
+                 |  value count   |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 | bitmap length  |
+                 |----------------|
+                 |  null bitmap   |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+non -zero data page32467;- 26500;- 229140;-
+
+```
+                 |----------------|
+                 |   value count  |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+The meanings of each field are as follows:
+
+- value count
+	- Represents the number of rows in a page
+- First row id
+	- Line number of the first line in page
+- bitmap length
+	- Represents the number of bytes in the next bitmap
+- null bitmap
+	- bitmap representing null information
+- Data
+	- Store data after encoding and compress
+	- You need to write in the header information of the data: is_compressed
+	- Various kinds of data encoded by different codes need to write some field information in the header information in order to achieve data parsing.
+	- TODO: Add header information for various encodings
+- Checksum
+	- Store page granularity checksum, including page header and subsequent actual data
+
+
+### Bloom Filter Pages ###
+
+For each bloom filter column, a page of the bloom filter is generated corresponding to the granularity of the page and saved in the bloom filter pages area.
+
+### Ordinal Index Page ###
+
+For each column, a sparse index of row numbers is established according to page granularity. The content is a pointer to the block (including offset and length) for the line number of the start line of the page
+
+### Short Key Index page ###
+
+We generate a sparse index of short key every N rows (configurable) with the contents of short key - > line number (ordinal)
+
+### Column's other indexes ###
+
+The format design supports the subsequent expansion of other index information, such as bitmap index, spatial index, etc. It only needs to write the required data to the existing column data, and add the corresponding metadata fields to FileFooterPB.
+
+### Metadata Definition ###
+SegmentFooterPB is defined as:
+
+```
+message ColumnPB {
+    required int32 unique_id = 1;   // The column id is used here, and the column name is not used
+    optional string name = 2;   // Column name,  when name equals __DORIS_DELETE_SIGN__, this column is a hidden delete column
+    required string type = 3;   // Column type
+    optional bool is_key = 4;   // Whether column is a primary key column
+    optional string aggregation = 5;    // Aggregate type
+    optional bool is_nullable = 6;      // Whether column is allowed to assgin null
+    optional bytes default_value = 7;   // Defalut value
+    optional int32 precision = 8;       // Precision of column
+    optional int32 frac = 9;
+    optional int32 length = 10;         // Length of column
+    optional int32 index_length = 11;   // Length of column index
+    optional bool is_bf_column = 12;    // Whether column has bloom filter index
+    optional bool has_bitmap_index = 15 [default=false];  // Whether column has bitmap index
+}
+
+// page offset
+message PagePointerPB {
+	required uint64 offset; // offset of page in segment file
+	required uint32 length; // length of page
+}
+
+message MetadataPairPB {
+  optional string key = 1;
+  optional bytes value = 2;
+}
+
+message ColumnMetaPB {
+	optional ColumnMessage encoding; // Encoding of column
+
+	optional PagePointerPB dict_page // Dictionary page
+	repeated PagePointerPB bloom_filter_pages; // Bloom filter pages
+	optional PagePointerPB ordinal_index_page; // Ordinal index page
+	optional PagePointerPB page_zone_map_page; // Page level of statistics index data
+
+	optional PagePointerPB bitmap_index_page; // Bitmap index page
+
+	optional uint64 data_footprint; // The size of the index in the column
+	optional uint64 index_footprint; // The size of the data in the column
+	optional uint64 raw_data_footprint; // Original column data size
+
+	optional CompressKind compress_kind; // Column compression type
+
+	optional ZoneMapPB column_zone_map; // Segment level of statistics index data
+	repeated MetadataPairPB column_meta_datas;
+}
+
+message SegmentFooterPB {
+	optional uint32 version = 2 [default = 1]; // For version compatibility and upgrade use
+	repeated ColumnPB schema = 5; // Schema of columns
+  optional uint64 num_values = 4; // Number of lines saved in the file
+  optional uint64 index_footprint = 7; // Index size
+  optional uint64 data_footprint = 8; // Data size
+	optional uint64 raw_data_footprint = 8; // Original data size
+
+  optional CompressKind compress_kind = 9 [default = COMPRESS_LZO]; // Compression type
+  repeated ColumnMetaPB column_metas = 10; // Column metadata
+	optional PagePointerPB key_index_page = 11; // short key index page
+}
+
+```
+
+## Read-write logic ##
+
+### Write ###
+
+The general writing process is as follows:
+1. Write magic
+2. Generate corresponding Column Writer according to schema information. Each Column Writer obtains corresponding encoding information (configurable) according to different types, and generates corresponding encoder according to encoding.
+3. Call encoder - > add (value) for data writing. Each K line generates a short key index entry, and if the current page satisfies certain conditions (the size exceeds 1M or the number of rows is K), a new page is generated and cached in memory.
+4. Continuous cycle step 3 until data writing is completed. Brush the data of each column into the file in sequence
+5. Generate FileFooterPB information and write it to the file.
+
+Relevant issues:
+
+- How does the index of short key be generated?
+	- Now we still generate a short key sparse index according to how many rows are sparse, and keep a short sparse index generated every 1024 rows. The specific content is: short key - > ordinal
+
+- What should be stored in the ordinal index?
+	- Store the first ordinal to page pointer mapping information for pages
+- What are stored in pages of different encoding types?
+	- Dictionary Compression
+	- plain
+	- rle
+	- bshuf
+
+### Read ###
+
+1. Read the magic of the file and judge the type and version of the file.
+2. Read FileFooterPB and check sum
+3. Read short key index and data ordinal index information of corresponding columns according to required columns
+4. Use start key and end key, locate the row number to be read through short key index, then determine the row ranges to be read through ordinal index, and filter the row ranges to be read through statistics, bitmap index and so on.
+5. Then read row data through ordinal index according to row ranges
+
+Relevant issues:
+1. How to quickly locate a row within the page?
+
+	The data inside the page is encoding, so it cannot locate the row-level data quickly. Different encoding methods have different schemes for fast line number positioning in-house, which need to be analyzed concretely:
+	- If it is rle-coded, skip is performed by resolving the head of RLE until the RLE block containing the row is reached, and then the reverse solution is performed.
+	- binary plain encoding: offset information will be stored in the page, and offset information will be specified in the page header. When reading, offset information will be parsed into the array first, so that you can quickly locate the data of a row of block through offset data information of each row.
+2. How to achieve efficient block reading? Consider merging adjacent blocks while they are being read, one-time reading?
+This requires judging whether the block is continuous at the time of reading, and if it is continuous, reading it once.
+
+## Coding ##
+
+In the existing Doris storage, plain encoding is adopted for string type encoding, which is inefficient. After comparison, it is found that in Baidu statistics scenario, data will expand more than twice because of string type coding. Therefore, it is planned to introduce dictionary-based coding compression.
+
+## Compression ##
+
+It implements a scalable compression framework, supports a variety of compression algorithms, facilitates the subsequent addition of new compression algorithms, and plans to introduce zstd compression.
+
+## TODO ##
+1. How to implement nested types? How to locate line numbers in nested types?
+2. How to optimize the downstream bitmap and column statistics caused by ScanRange splitting?
diff --git a/docs/en/design/grouping_sets_design.md b/docs/en/design/grouping_sets_design.md
new file mode 100644
index 0000000000..16acc33997
--- /dev/null
+++ b/docs/en/design/grouping_sets_design.md
@@ -0,0 +1,501 @@
+---
+{
+    "title": "GROUPING SETS DESIGN",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# GROUPING SETS DESIGN
+
+## 1. GROUPING SETS Background
+
+The `CUBE`, `ROLLUP`, and `GROUPING` `SETS` extensions to SQL make querying and reporting easier and faster. `CUBE`, `ROLLUP`, and grouping sets produce a single result set that is equivalent to a `UNION` `ALL` of differently grouped rows. `ROLLUP` calculates aggregations such as `SUM`, `COUNT`, `MAX`, `MIN`, and `AVG` at increasing levels of aggregation, from the most detailed up to a grand total. `CUBE` is an extension similar to `ROLLUP`, enabling a single statement to calculate all p [...]
+To enhance performance, `CUBE`, `ROLLUP`, and `GROUPING SETS` can be parallelized: multiple processes can simultaneously execute all of these statements. These capabilities make aggregate calculations more efficient, thereby enhancing database performance, and scalability.
+
+The three `GROUPING` functions help you identify the group each row belongs to and enable sorting subtotal rows and filtering results.
+
+### 1.1 GROUPING SETS Syntax
+
+`GROUPING SETS` syntax lets you define multiple groupings in the same query. `GROUP BY` computes all the groupings specified and combines them with `UNION ALL`. For example, consider the following statement:
+
+```
+SELECT k1, k2, SUM( k3 ) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k1), (k2), ( ) );
+```
+
+
+This statement is equivalent to:
+
+```
+SELECT k1, k2, SUM( k3 ) FROM t GROUP BY k1, k2
+UNION
+SELECT k1, null, SUM( k3 ) FROM t GROUP BY k1
+UNION
+SELECT null, k2, SUM( k3 ) FROM t GROUP BY k2
+UNION
+SELECT null, null, SUM( k3 ) FROM t
+```
+
+This is an example of real query:
+
+```
+mysql> SELECT * FROM t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+8 rows in set (0.01 sec)
+
+mysql> SELECT k1, k2, SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+-----------+
+| k1   | k2   | sum(`k3`) |
++------+------+-----------+
+| b    | B    |         6 |
+| a    | B    |         4 |
+| a    | A    |         3 |
+| b    | A    |         5 |
+| NULL | B    |        10 |
+| NULL | A    |         8 |
+| a    | NULL |         7 |
+| b    | NULL |        11 |
+| NULL | NULL |        18 |
++------+------+-----------+
+9 rows in set (0.06 sec)
+```
+
+### 1.2 ROLLUP Syntax
+
+`ROLLUP` enables a `SELECT` statement to calculate multiple levels of subtotals across a specified group of dimensions. It also calculates a grand total. `ROLLUP` is a simple extension to the `GROUP` `BY` clause, so its syntax is extremely easy to use. The `ROLLUP` extension is highly efficient, adding minimal overhead to a query.
+
+`ROLLUP` appears in the `GROUP` `BY` clause in a `SELECT` statement. Its form is:
+
+```
+SELECT a, b,c, SUM( d ) FROM tab1 GROUP BY ROLLUP(a,b,c)
+```
+
+This statement is equivalent to GROUPING SETS as followed:
+
+```
+GROUPING SETS (
+(a,b,c),
+( a, b ),
+( a),
+( )
+)
+```
+
+### 1.3 CUBE Syntax
+
+Like `ROLLUP`   `CUBE` generates all the subtotals that could be calculated for a data cube with the specified dimensions.
+
+```
+SELECT a, b,c, SUM( d ) FROM tab1 GROUP BY CUBE(a,b,c)
+```
+
+e.g.  CUBE ( a, b, c )  is equivalent to GROUPING SETS as followed:
+
+```
+GROUPING SETS (
+( a, b, c ),
+( a, b ),
+( a,    c ),
+( a       ),
+(    b, c ),
+(    b    ),
+(       c ),
+(         )
+)
+```
+
+### 1.4 GROUPING and GROUPING_ID Function
+
+Indicates whether a specified column expression in a `GROUP BY` list is aggregated or not. `GROUPING `returns 1 for aggregated or 0 for not aggregated in the result set. `GROUPING` can be used only in the `SELECT` list, `HAVING`, and `ORDER BY` clauses when `GROUP BY` is specified.
+
+`GROUPING_ID` describes which of a list of expressions are grouped in a row produced by a `GROUP BY` query. The `GROUPING_ID` function simply returns the decimal equivalent of the binary value formed as a result of the concatenation of the values returned by the `GROUPING` functions.
+
+Each `GROUPING_ID` argument must be an element of the `GROUP BY` list. `GROUPING_ID ()` returns an **integer** bitmap whose lowest N bits may be lit. A lit **bit** indicates the corresponding argument is not a grouping column for the given output row. The lowest-order **bit** corresponds to argument N, and the N-1th lowest-order **bit** corresponds to argument 1. If the column is a grouping column the bit is 0 else is 1.
+
+For example:
+
+```
+mysql> select * from t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+```
+
+grouping sets result:
+
+```
+mysql> SELECT k1, k2, GROUPING(k1), GROUPING(k2), SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+----------------+----------------+-----------+
+| k1   | k2   | grouping(`k1`) | grouping(`k2`) | sum(`k3`) |
++------+------+----------------+----------------+-----------+
+| a    | A    |              0 |              0 |         3 |
+| a    | B    |              0 |              0 |         4 |
+| a    | NULL |              0 |              1 |         7 |
+| b    | A    |              0 |              0 |         5 |
+| b    | B    |              0 |              0 |         6 |
+| b    | NULL |              0 |              1 |        11 |
+| NULL | A    |              1 |              0 |         8 |
+| NULL | B    |              1 |              0 |        10 |
+| NULL | NULL |              1 |              1 |        18 |
++------+------+----------------+----------------+-----------+
+9 rows in set (0.02 sec)
+
+mysql> SELECT k1, k2, GROUPING_ID(k1,k2), SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+-------------------------+-----------+
+| k1   | k2   | grouping_id(`k1`, `k2`) | sum(`k3`) |
++------+------+-------------------------+-----------+
+| a    | A    |                       0 |         3 |
+| a    | B    |                       0 |         4 |
+| a    | NULL |                       1 |         7 |
+| b    | A    |                       0 |         5 |
+| b    | B    |                       0 |         6 |
+| b    | NULL |                       1 |        11 |
+| NULL | A    |                       2 |         8 |
+| NULL | B    |                       2 |        10 |
+| NULL | NULL |                       3 |        18 |
++------+------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+
+mysql> SELECT k1, k2, grouping(k1), grouping(k2), GROUPING_ID(k1,k2), SUM(k4) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) ) order by k1, k2;
++------+------+----------------+----------------+-------------------------+-----------+
+| k1   | k2   | grouping(`k1`) | grouping(`k2`) | grouping_id(`k1`, `k2`) | sum(`k4`) |
++------+------+----------------+----------------+-------------------------+-----------+
+| a    | A    |              0 |              0 |                       0 |         3 |
+| a    | B    |              0 |              0 |                       0 |         4 |
+| a    | NULL |              0 |              1 |                       1 |         7 |
+| b    | A    |              0 |              0 |                       0 |         5 |
+| b    | B    |              0 |              0 |                       0 |         6 |
+| b    | NULL |              0 |              1 |                       1 |        11 |
+| NULL | A    |              1 |              0 |                       2 |         8 |
+| NULL | B    |              1 |              0 |                       2 |        10 |
+| NULL | NULL |              1 |              1 |                       3 |        18 |
++------+------+----------------+----------------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+
+```
+### 1.5 Composition and nesting of GROUPING SETS
+
+First of all, a GROUP BY clause is essentially a special case of GROUPING SETS, for example:
+
+```
+   GROUP BY a
+is equivalent to:
+   GROUP BY GROUPING SETS((a))
+also,
+   GROUP BY a,b,c
+is equivalent to:
+   GROUP BY GROUPING SETS((a,b,c))
+```
+
+Similarly, CUBE and ROLLUP can be expanded into GROUPING SETS, so the various combinations and nesting of GROUP BY, CUBE, ROLLUP, GROUPING SETS are essentially the combination and nesting of GROUPING SETS.
+
+For GROUPING SETS nesting, it is semantically equivalent to writing the statements inside the nest directly outside. (ref:<https://www.brytlyt.com/documentation/data-manipulation-dml/grouping-sets-rollup-cube/>) mentions: 
+
+```
+The CUBE and ROLLUP constructs can be used either directly in the GROUP BY clause, or nested inside a GROUPING SETS clause. If one GROUPING SETS clause is nested inside another, the effect is the same as if all the elements of the inner clause had been written directly in the outer clause.
+```
+
+For a combined list of multiple GROUPING SETS, many databases consider it a cross product relationship.
+
+for example:
+
+```
+GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))
+
+is equivalent to:
+
+GROUP BY GROUPING SETS (
+(a, b, c, d), (a, b, c, e),
+(a, b, d),    (a, b, e),
+(a, c, d),    (a, c, e),
+(a, d),       (a, e)
+)
+```
+
+For the combination and nesting of GROUPING SETS, each database support is not the same. For example snowflake does not support any combination and nesting.
+(<https://docs.snowflake.net/manuals/sql-reference/constructs/group-by.html>)
+
+Oracle supports both composition and nesting.
+(<https://docs.oracle.com/cd/B19306_01/server.102/b14223/aggreg.htm#i1006842>)
+
+Presto supports composition, but not nesting.
+(<https://prestodb.github.io/docs/current/sql/select.html>)
+
+## 2. Object
+
+Support `GROUPING SETS`,  `ROLLUP` and `CUBE ` syntax, implements 1.1, 1.2, 1.3 1.4, 1.5, not support the combination
+ and nesting of GROUPING SETS in current version.
+
+### 2.1 GROUPING SETS Syntax
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY GROUPING SETS ( groupSet [ , groupSet [ , ... ] ] )
+[ ... ]
+
+groupSet ::= { ( expr  [ , expr [ , ... ] ] )}
+
+<expr>
+Expression, column name.
+```
+
+### 2.2 ROLLUP Syntax
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY ROLLUP ( expr  [ , expr [ , ... ] ] )
+[ ... ]
+
+<expr>
+Expression, column name.
+```
+
+### 2.3 CUBE Syntax
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY CUBE ( expr  [ , expr [ , ... ] ] )
+[ ... ]
+
+<expr>
+Expression, column name.
+```
+
+## 3. Implementation
+
+### 3.1 Overall Design Approaches
+
+For `GROUPING SET`  is equivalent to the `UNION` of  `GROUP BY` . So we can expand input rows, and run an GROUP BY on these rows.
+
+For example:
+
+```
+SELECT a, b FROM src GROUP BY a, b GROUPING SETS ((a, b), (a), (b), ());
+```
+
+Data in table src:
+
+```
+1, 2
+3, 4
+```
+
+Base on  GROUPING SETS , we can expend the input to:
+
+```
+1, 2       (GROUPING_ID: a, b -> 00 -> 0)
+1, null    (GROUPING_ID: a, null -> 01 -> 1)
+null, 2    (GROUPING_ID: null, b -> 10 -> 2)
+null, null (GROUPING_ID: null, null -> 11 -> 3)
+
+3, 4       (GROUPING_ID: a, b -> 00 -> 0)
+3, null    (GROUPING_ID: a, null -> 01 -> 1)
+null, 4    (GROUPING_ID: null, b -> 10 -> 2)
+null, null (GROUPING_ID: null, null -> 11 -> 3)
+```
+
+And then use those row as input, then GROUP BY  a, b, GROUPING_ID
+
+### 3.2 Example
+
+Table t:
+
+```
+mysql> select * from t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+8 rows in set (0.01 sec)
+```
+
+for the query:
+
+```
+SELECT k1, k2, GROUPING_ID(k1,k2), SUM(k3) FROM t GROUP BY GROUPING SETS ((k1, k2), (k1), (k2), ());
+```
+
+First, expand the input, every row expand into 4 rows ( the size of GROUPING SETS), and insert GROUPING_ID column
+
+e.g.  a, A, 1 expanded to:
+
+```
++------+------+------+-------------------------+
+| k1   | k2   | k3   | GROUPING_ID(`k1`, `k2`) |
++------+------+------+-------------------------+
+| a    | A    |    1 |                       0 |
+| a    | NULL |    1 |                       1 |
+| NULL | A    |    1 |                       2 |
+| NULL | NULL |    1 |                       3 |
++------+------+------+-------------------------+
+```
+
+Finally, all rows expended as follows (32 rows):
+
+```
++------+------+------+-------------------------+
+| k1   | k2   | k3   | GROUPING_ID(`k1`, `k2`) |
++------+------+------+-------------------------+
+| a    | A    |    1 |                       0 |
+| a    | A    |    2 |                       0 |
+| a    | B    |    1 |                       0 |
+| a    | B    |    3 |                       0 |
+| b    | A    |    1 |                       0 |
+| b    | A    |    4 |                       0 |
+| b    | B    |    1 |                       0 |
+| b    | B    |    5 |                       0 |
+| a    | NULL |    1 |                       1 |
+| a    | NULL |    1 |                       1 |
+| a    | NULL |    2 |                       1 |
+| a    | NULL |    3 |                       1 |
+| b    | NULL |    1 |                       1 |
+| b    | NULL |    1 |                       1 |
+| b    | NULL |    4 |                       1 |
+| b    | NULL |    5 |                       1 |
+| NULL | A    |    1 |                       2 |
+| NULL | A    |    1 |                       2 |
+| NULL | A    |    2 |                       2 |
+| NULL | A    |    4 |                       2 |
+| NULL | B    |    1 |                       2 |
+| NULL | B    |    1 |                       2 |
+| NULL | B    |    3 |                       2 |
+| NULL | B    |    5 |                       2 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    2 |                       3 |
+| NULL | NULL |    3 |                       3 |
+| NULL | NULL |    4 |                       3 |
+| NULL | NULL |    5 |                       3 |
++------+------+------+-------------------------+
+32 rows in set.
+```
+
+now GROUP BY k1, k2, GROUPING_ID(k1,k2):
+
+```
++------+------+-------------------------+-----------+
+| k1   | k2   | grouping_id(`k1`, `k2`) | sum(`k3`) |
++------+------+-------------------------+-----------+
+| a    | A    |                       0 |         3 |
+| a    | B    |                       0 |         4 |
+| a    | NULL |                       1 |         7 |
+| b    | A    |                       0 |         5 |
+| b    | B    |                       0 |         6 |
+| b    | NULL |                       1 |        11 |
+| NULL | A    |                       2 |         8 |
+| NULL | B    |                       2 |        10 |
+| NULL | NULL |                       3 |        18 |
++------+------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+```
+
+The result is equivalent to the UNION ALL
+
+```
+select k1, k2, sum(k3) from t group by k1, k2
+UNION ALL
+select NULL, k2, sum(k3) from t group by k2
+UNION ALL
+select k1, NULL, sum(k3) from t group by k1
+UNION ALL
+select NULL, NULL, sum(k3) from t;
+
++------+------+-----------+
+| k1   | k2   | sum(`k3`) |
++------+------+-----------+
+| b    | B    |         6 |
+| b    | A    |         5 |
+| a    | A    |         3 |
+| a    | B    |         4 |
+| a    | NULL |         7 |
+| b    | NULL |        11 |
+| NULL | B    |        10 |
+| NULL | A    |         8 |
+| NULL | NULL |        18 |
++------+------+-----------+
+9 rows in set (0.06 sec)
+```
+
+### 3.3 FE 
+
+#### 3.3.1 Tasks
+
+1. Add GroupByClause, replace groupingExprs.
+2. Add Grouping Sets, Cube and RollUp syntax.
+3. Add GroupByClause in SelectStmt.
+4. Add GroupingFunctionCallExpr, implements grouping grouping_id function call
+5. Add VirtualSlot, generate the map of virtual slots and real slots
+6. add virtual column GROUPING_ID and other virtual columns generated by grouping and grouping_id, insert into groupingExprs,
+7. Add a PlanNode, name as RepeatNode. For GroupingSets aggregation insert RepeatNode to the plan.
+
+#### 3.3.2 Tuple
+
+In order to add GROUPING_ID to groupingExprs in GroupByClause, need to create virtual SlotRef, also, need tot create a tuple for this slot, named GROUPING\_\_ID Tuple.
+
+For the plannode RepeatNode, its input are all the tuples of its children and its output tuple are the repeat data and GROUPING_ID.
+
+
+#### 3.3.3 Expression and Function Substitution
+
+expr -> if(bitand(pos, grouping_id)=0, expr, null) for expr in extension grouping clause
+grouping_id() -> grouping_id(grouping_id) for grouping_id function
+
+### 3.4 BE
+
+#### 3.4.1 Tasks
+
+1. Add RepeatNode executor, expend the input data and append GROUPING_ID to every row
+2. Implements grouping_id() and grouping() function.
diff --git a/docs/en/design/metadata-design.md b/docs/en/design/metadata-design.md
new file mode 100644
index 0000000000..43a2fa89b1
--- /dev/null
+++ b/docs/en/design/metadata-design.md
@@ -0,0 +1,127 @@
+---
+{
+    "title": "Metadata Design Document",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Metadata Design Document
+
+## Noun Interpretation
+
+* FE: Frontend, the front-end node of Doris. Mainly responsible for receiving and returning client requests, metadata, cluster management, query plan generation and so on.
+* BE: Backend, the back-end node of Doris. Mainly responsible for data storage and management, query plan execution and other work.
+* bdbje: [Oracle Berkeley DB Java Edition](http://www.oracle.com/technetwork/database/berkeleydb/overview/index-093405.html). In Doris, we use bdbje to persist metadata operation logs and high availability of FE.
+
+## Overall architecture
+![](/images/palo_architecture.jpg)
+
+As shown above, Doris's overall architecture is divided into two layers. Multiple FEs form the first tier, providing lateral expansion and high availability of FE. Multiple BEs form the second layer, which is responsible for data storage and management. This paper mainly introduces the design and implementation of metadata in FE layer.
+
+1. There are two different kinds of FE nodes: follower and observer. Leader election and data synchronization are taken among FE nodes by bdbje ([BerkeleyDB Java Edition](http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/overview/index-093405.html)).
+
+2. The follower node is elected, and one of the followers becomes the leader node, which is responsible for the writing of metadata. When the leader node goes down, other follower nodes re-elect a leader to ensure high availability of services.
+
+3. The observer node only synchronizes metadata from the leader node and does not participate in the election. It can be scaled horizontally to provide the extensibility of metadata reading services.
+
+> Note: The concepts of follower and observer corresponding to bdbje are replica and observer. You may use both names below.
+
+## Metadata structure
+
+Doris's metadata is in full memory. A complete metadata image is maintained in each FE memory. Within Baidu, a cluster of 2,500 tables and 1 million fragments (3 million copies) occupies only about 2GB of metadata in memory. (Of course, the memory overhead for querying intermediate objects and various job information needs to be estimated according to the actual situation. However, it still maintains a low memory overhead.
+
+At the same time, metadata is stored in the memory as a whole in a tree-like hierarchical structure. By adding auxiliary structure, metadata information at all levels can be accessed quickly.
+
+The following figure shows the contents stored in Doris meta-information.
+
+![](/images/metadata_contents.png)
+
+As shown above, Doris's metadata mainly stores four types of data:
+
+1. User data information. Including database, table Schema, fragmentation information, etc.
+2. All kinds of job information. For example, import jobs, Clone jobs, SchemaChange jobs, etc.
+3. User and permission information.
+4. Cluster and node information.
+
+## Data stream
+
+![](/images/metadata_stream.png)
+
+The data flow of metadata is as follows:
+
+1. Only leader FE can write metadata. After modifying leader's memory, the write operation serializes into a log and writes to bdbje in the form of key-value. The key is a continuous integer, and as log id, value is the serialized operation log.
+
+2. After the log is written to bdbje, bdbje copies the log to other non-leader FE nodes according to the policy (write most/write all). The non-leader FE node modifies its metadata memory image by playback of the log, and completes the synchronization with the metadata of the leader node.
+
+3. When the number of log bars of the leader node reaches the threshold (default 10W bars), the checkpoint thread is started. Checkpoint reads existing image files and subsequent logs and replays a new mirror copy of metadata in memory. The copy is then written to disk to form a new image. The reason for this is to regenerate a mirror copy instead of writing an existing image to an image, mainly considering that the write operation will be blocked during writing the image plus read lock. [...]
+
+4. After the image file is generated, the leader node notifies other non-leader nodes that a new image has been generated. Non-leader actively pulls the latest image files through HTTP to replace the old local files.
+
+5. The logs in bdbje will be deleted regularly after the image is completed.
+
+## Implementation details
+
+### Metadata catalogue
+
+1. The metadata directory is specified by the FE configuration item `meta_dir'.
+
+2. Data storage directory for bdbje under `bdb/` directory.
+
+3. The storage directory for image files under the `image/` directory.
+
+* `Image.[logid]`is the latest image file. The suffix `logid` indicates the ID of the last log contained in the image.
+* `Image.ckpt` is the image file being written. If it is successfully written, it will be renamed `image.[logid]` and replaced with the original image file.
+* The`cluster_id` is recorded in the `VERSION` file. `Cluster_id` uniquely identifies a Doris cluster. It is a 32-bit integer randomly generated at the first startup of leader. You can also specify a cluster ID through the Fe configuration item `cluster_id'.
+* The role of FE itself recorded in the `ROLE` file. There are only `FOLLOWER` and `OBSERVER`. Where `FOLLOWER` denotes FE as an optional node. (Note: Even the leader node has a role of `FOLLOWER`)
+
+### Start-up process
+
+1. FE starts for the first time. If the startup script does not add any parameters, it will try to start as leader. You will eventually see `transfer from UNKNOWN to MASTER` in the FE startup log.
+
+2. FE starts for the first time. If the `-helper` parameter is specified in the startup script and points to the correct leader FE node, the FE first asks the leader node about its role (ROLE) and cluster_id through http. Then pull up the latest image file. After reading image file and generating metadata image, start bdbje and start bdbje log synchronization. After synchronization is completed, the log after image file in bdbje is replayed, and the final metadata image generation is completed.
+
+	> Note 1: When starting with the `-helper` parameter, you need to first add the FE through the leader through the MySQL command, otherwise, the start will report an error.
+
+	> Note 2: `-helper` can point to any follower node, even if it is not leader.
+
+	> Note 3: In the process of synchronization log, the Fe log will show `xxx detached`. At this time, the log pull is in progress, which is a normal phenomenon.
+
+3. FE is not the first startup. If the startup script does not add any parameters, it will determine its identity according to the ROLE information stored locally. At the same time, according to the cluster information stored in the local bdbje, the leader information is obtained. Then read the local image file and the log in bdbje to complete the metadata image generation. (If the roles recorded in the local ROLE are inconsistent with those recorded in bdbje, an error will be reported.)
+
+4. FE is not the first boot, and the `-helper` parameter is specified in the boot script. Just like the first process started, the leader role is asked first. But it will be compared with the ROLE stored by itself. If they are inconsistent, they will report errors.
+
+#### Metadata Read-Write and Synchronization
+
+1. Users can use Mysql to connect any FE node to read and write metadata. If the connection is a non-leader node, the node forwards the write operation to the leader node. When the leader is successfully written, it returns a current and up-to-date log ID of the leader. Later, the non-leader node waits for the log ID it replays to be larger than the log ID it returns to the client before returning the message that the command succeeds. This approach guarantees Read-Your-Write semantics f [...]
+
+	> Note: Some non-write operations are also forwarded to leader for execution. For example, `SHOW LOAD` operation. Because these commands usually need to read the intermediate states of some jobs, which are not written to bdbje, there are no such intermediate states in the memory of the non-leader node. (FE's direct metadata synchronization depends entirely on bdbje's log playback. If a metadata modification operation does not write bdbje's log, the result of the modification of the oper [...]
+
+2. The leader node starts a TimePrinter thread. This thread periodically writes a key-value entry for the current time to bdbje. The remaining non-leader nodes read the recorded time in the log by playback and compare it with the local time. If the lag between the local time and the local time is found to be greater than the specified threshold (configuration item: `meta_delay_toleration_second`). If the write interval is half of the configuration item, the node will be in the **unreadab [...]
+
+3. The metadata of each FE only guarantees the final consistency. Normally, inconsistent window periods are only milliseconds. We guarantee the monotonous consistency of metadata access in the same session. But if the same client connects different FEs, metadata regression may occur. (But for batch update systems, this problem has little impact.)
+
+### Downtime recovery
+
+1. When the leader node goes down, the rest of the followers will immediately elect a new leader node to provide services.
+2. Metadata cannot be written when most follower nodes are down. When metadata is not writable, if a write operation request occurs, the current process is that the **FE process exits**. This logic will be optimized in the future, and read services will still be provided in the non-writable state.
+3. The downtime of observer node will not affect the state of any other node. It also does not affect metadata reading and writing at other nodes.
diff --git a/docs/zh-CN/design/doris_storage_optimization.md b/docs/zh-CN/design/doris_storage_optimization.md
new file mode 100644
index 0000000000..f2816494e1
--- /dev/null
+++ b/docs/zh-CN/design/doris_storage_optimization.md
@@ -0,0 +1,234 @@
+---
+{
+    "title": "Doris存储文件格式优化",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris存储文件格式优化 #
+
+## 文件格式 ##
+
+![](/images/segment_v2.png)
+<center>图1. doris segment文件格式</center>
+
+文件包括:
+- 文件开始是8个字节的magic code,用于识别文件格式和版本
+- Data Region:用于存储各个列的数据信息,这里的数据是按需分page加载的
+- Index Region: doris中将各个列的index数据统一存储在Index Region,这里的数据会按照列粒度进行加载,所以跟列的数据信息分开存储
+- Footer信息
+	- FileFooterPB:定义文件的元数据信息
+	- 4个字节的footer pb内容的checksum
+	- 4个字节的FileFooterPB消息长度,用于读取FileFooterPB
+	- 8个字节的MAGIC CODE,之所以在末位存储,是方便不同的场景进行文件类型的识别
+
+文件中的数据按照page的方式进行组织,page是编码和压缩的基本单位。现在的page类型包括以下几种:
+
+### DataPage ###
+
+DataPage分为两种:nullable和non-nullable的data page。
+
+nullable的data page内容包括:
+```
+
+                 +----------------+
+                 |  value count   |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 | bitmap length  |
+                 |----------------|
+                 |  null bitmap   |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+non-nullable data page结构如下:
+
+```
+                 |----------------|
+                 |   value count  |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+其中各个字段含义如下:
+
+- value count
+	- 表示page中的行数
+- first row id
+	- page中第一行的行号
+- bitmap length
+	- 表示接下来bitmap的字节数
+- null bitmap
+	- 表示null信息的bitmap
+- data
+	- 存储经过encoding和compress之后的数据
+	- 需要在数据的头部信息中写入:is_compressed
+	- 各种不同编码的data需要在头部信息写入一些字段信息,以实现数据的解析
+		- TODO:添加各种encoding的header信息
+- checksum
+	- 存储page粒度的校验和,包括page的header和之后的实际数据
+
+
+### Bloom Filter Pages ###
+
+针对每个bloom filter列,会在page的粒度相应的生成一个bloom filter的page,保存在bloom filter pages区域
+
+### Ordinal Index Page ###
+
+针对每个列,都会按照page粒度,建立行号的稀疏索引。内容为这个page的起始行的行号到这个block的指针(包括offset和length)
+
+### Short Key Index page ###
+
+我们会每隔N行(可配置)生成一个short key的稀疏索引,索引的内容为:short key->行号(ordinal)
+
+### Column的其他索引 ###
+
+该格式设计支持后续扩展其他的索引信息,比如bitmap索引,spatial索引等等,只需要将需要的数据写到现有的列数据后面,并且添加对应的元数据字段到FileFooterPB中
+
+### 元数据定义 ###
+SegmentFooterPB的定义为:
+
+```
+message ColumnPB {
+    required int32 unique_id = 1;   // 这里使用column id, 不使用column name是因为计划支持修改列名
+    optional string name = 2;   // 列的名字,  当name为__DORIS_DELETE_SIGN__, 表示该列为隐藏的删除列
+    required string type = 3;   // 列类型
+    optional bool is_key = 4;   // 是否是主键列
+    optional string aggregation = 5;    // 聚合方式
+    optional bool is_nullable = 6;      // 是否有null
+    optional bytes default_value = 7;   // 默认值
+    optional int32 precision = 8;       // 精度
+    optional int32 frac = 9;
+    optional int32 length = 10;         // 长度
+    optional int32 index_length = 11;   // 索引长度
+    optional bool is_bf_column = 12;    // 是否有bf词典
+    optional bool has_bitmap_index = 15 [default=false];  // 是否有bitmap索引
+}
+
+// page偏移
+message PagePointerPB {
+	required uint64 offset; // page在文件中的偏移
+	required uint32 length; // page的大小
+}
+
+message MetadataPairPB {
+  optional string key = 1;
+  optional bytes value = 2;
+}
+
+message ColumnMetaPB {
+	optional ColumnMessage encoding; // 编码方式
+
+	optional PagePointerPB dict_page // 词典page
+	repeated PagePointerPB bloom_filter_pages; // bloom filter词典信息
+	optional PagePointerPB ordinal_index_page; // 行号索引数据
+	optional PagePointerPB page_zone_map_page; // page级别统计信息索引数据
+
+	optional PagePointerPB bitmap_index_page; // bitmap索引数据
+
+	optional uint64 data_footprint; // 列中索引的大小
+	optional uint64 index_footprint; // 列中数据的大小
+	optional uint64 raw_data_footprint; // 原始列数据大小
+
+	optional CompressKind compress_kind; // 列的压缩方式
+
+	optional ZoneMapPB column_zone_map; //文件级别的过滤条件
+	repeated MetadataPairPB column_meta_datas;
+}
+
+message SegmentFooterPB {
+	optional uint32 version = 2 [default = 1]; // 用于版本兼容和升级使用
+	repeated ColumnPB schema = 5; // 列Schema
+  optional uint64 num_values = 4; // 文件中保存的行数
+  optional uint64 index_footprint = 7; // 索引大小
+  optional uint64 data_footprint = 8; // 数据大小
+	optional uint64 raw_data_footprint = 8; // 原始数据大小
+
+  optional CompressKind compress_kind = 9 [default = COMPRESS_LZO]; // 压缩方式
+  repeated ColumnMetaPB column_metas = 10; // 列元数据
+	optional PagePointerPB key_index_page; // short key索引page
+}
+
+```
+
+## 读写逻辑 ##
+
+### 写入 ###
+
+大体的写入流程如下:
+1. 写入magic
+2. 根据schema信息,生成对应的ColumnWriter,每个ColumnWriter按照不同的类型,获取对应的encoding信息(可配置),根据encoding,生成对应的encoder
+3. 调用encoder->add(value)进行数据写入,每个K行,生成一个short key index entry,并且,如果当前的page满足一定条件(大小超过1M或者行数为K),就生成一个新的page,缓存在内存中。
+4. 不断的循环步骤3,直到数据写入完成。将各个列的数据依序刷入文件中
+5. 生成FileFooterPB信息,写入文件中。
+
+相关的问题:
+
+- short key的索引如何生成?
+	- 现在还是按照每隔多少行生成一个short key的稀疏索引,保持每隔1024行生成一个short的稀疏索引,具体的内容是:short key -> ordinal
+
+- ordinal索引里面应该存什么?
+	- 存储page的第一个ordinal到page pointer的映射信息
+- 不同encoding类型的page里存什么?
+	- 词典压缩
+	- plain
+	- rle
+	- bshuf
+
+### 读取 ###
+
+1. 读取文件的magic,判断文件类型和版本
+2. 读取FileFooterPB,进行checksum校验
+3. 按照需要的列,读取short key索引和对应列的数据ordinal索引信息
+4. 使用start key和end key,通过short key索引定位到要读取的行号,然后通过ordinal索引确定需要读取的row ranges, 同时需要通过统计信息、bitmap索引等过滤需要读取的row ranges
+5. 然后按照row ranges通过ordinal索引读取行的数据
+
+相关的问题:
+1. 如何实现在page内部快速的定位到某一行?
+
+	page内部是的数据是经过encoding的,无法快速进行行级数据的定位。不同的encoding方式,在内部进行快速的行号定位的方案不一样,需要具体分析:
+	- 如果是rle编码的,需要通过解析rle的header进行skip,直到到达包含该行的那个rle块之后,再进行反解。
+	- binary plain encoding:会在page的中存储offset信息,并且会在page header中指定offset信息的offset,读取的时候会先解析offset信息到数组中,这样子就可以通过各个行的offset数据信息快速的定位block某一行的数据
+2. 如何实现块的高效读取?可以考虑将相邻的块在读取的时候进行merge,一次性读取?
+	这个需要在读取的时候,判断block是否连续,如果连续,就一次性的读取
+
+## 编码 ##
+
+现有的doris存储中,针对string类型的编码,采用plain encoding的方式,效率比较低。经过对比,发现在百度统计的场景下,数据会因为string类型的编码膨胀超过一倍。所以,计划引入基于词典的编码压缩。
+
+## 压缩 ##
+
+实现可扩展的压缩框架,支持多种压缩算法,方便后续添加新的压缩算法,计划引入zstd压缩。
+
+## TODO ##
+1. 如何实现嵌套类型?如何在嵌套类型中进行行号定位?
+2. 如何优化现在的ScanRange拆分导致的下游bitmap、column statistic统计等进行多次?
diff --git a/docs/zh-CN/design/flink_doris_connector_design.md b/docs/zh-CN/design/flink_doris_connector_design.md
new file mode 100644
index 0000000000..ca91982d37
--- /dev/null
+++ b/docs/zh-CN/design/flink_doris_connector_design.md
@@ -0,0 +1,272 @@
+---
+{
+    "title": "Flink Doris Connector设计方案",
+    "language": "zh-CN"
+}
+
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under 
+
+# Flink Doris Connector设计方案
+
+该方案首先感谢社区Spark Doris Connector的作者
+
+从Doris角度看,将其数据引入Flink,可以使用Flink一系列丰富的生态产品,拓宽了产品的想象力,也使得Doris和其他数据源的联合查询成为可能
+
+从我们业务架构出发和业务需求,我们选择了Flink作为我们架构的一部分,用于数据的ETL及实时计算框架,社区目前支持Spark doris connector,因此我们参照Spark doris connector 设计开发了Flink doris Connector。
+
+## 技术选型
+
+一开始我们选型的时候,也是和Spark Doris Connector 一样,开始考虑的是JDBC的方式,但是这种方式就像Spark doris connector那篇文章中说的,有优点,但是缺点更明显。后来我们阅读及测试了Spark的代码,决定站在巨人的肩上来实现(备注:直接拷贝代码修改)。 
+
+以下内容来自Spark Doris Connector博客的,直接拷贝了
+
+```
+于是我们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种方案下,Doris可以暴露Doris数据分布给Spark。Spark的Driver访问Doris的FE获取Doris表的Schema和底层数据分布。之后,依据此数据分布,合理分配数据查询任务给Executors。最后,Spark的Executors分别访问不同的BE进行查询。大大提升了查询的效率
+```
+
+## 使用方法
+
+在Doris的代码库的 extension/flink-doris-connector/ 目录下编译生成doris-flink-1.0.0-SNAPSHOT.jar,将这个jar包加入flink的ClassPath中,即可使用Flink-on-Doris功能了
+
+#### SQL方式
+
+支持功能:
+
+1. 支持通过Flink SQL方式读取Doris数仓里表的数据到Flink里进行计算
+2. 支持通过Flink SQL将数据insert 到数仓对应的表中,后端实现是通过Stream Load直接和BE进行通讯完成数据插入操作
+3. 可以通过Flink关联非doris的外部数据源表进行关联分析
+
+示例:
+
+```java
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation01 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation02 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation_01',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+
+        tEnv.executeSql("INSERT INTO test_aggregation02 select * from test_aggregation01");
+        tEnv.executeSql("select count(1) from test_aggregation01");
+```
+
+#### DataStream方式
+
+```java
+DorisOptions.Builder options = DorisOptions.builder()
+                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+                .setUsername("$YOUR_DORIS_USERNAME")
+                .setPassword("$YOUR_DORIS_PASSWORD")
+                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
+env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
+```
+
+## 适用场景
+
+
+
+![1616987965864](/images/Flink-doris-connector.png)
+
+
+
+#### 1.使用Flink对Doris中的数据和其他数据源进行联合分析
+
+很多业务部门会将自己的数据放在不同的存储系统上,比如一些在线分析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些需要事物的数据放在MySQL中,等等。业务往往需要跨多个存储源进行分析,通过Flink Doris Connector打通Flink和Doris后,业务可以直接使用Flink,将Doris中的数据与多个外部数据源做联合查询计算。
+
+#### 2.实时数据接入
+
+Flink Doris Connector之前:针对业务不规则数据,经常需要针对消息做规范处理,空值过滤等写入新的topic,然后再启动Routine load写入Doris。
+
+![1616988281677](/images/Flink-doris-connector1.png)
+
+Flink Doris Connector之后:flink读取kafka,直接写入doris。
+
+![1616988514873](/images/Flink-doris-connector2.png)
+
+
+
+## 技术实现
+
+### 架构图
+
+![1616997396610](/images/Flink-doris-connector-architecture.png)
+
+
+
+### Doris对外提供更多能力
+
+#### Doris FE
+
+对外开放了获取内部表的元数据信息、单表查询规划和部分统计信息的接口。
+
+所有的Rest API接口都需要进行HttpBasic认证,用户名和密码是登录数据库的用户名和密码,需要注意权限的正确分配。
+
+```
+// 获取表schema元信息
+GET api/{database}/{table}/_schema
+
+// 获取对单表的查询规划模版
+POST api/{database}/{table}/_query_plan
+{
+"sql": "select k1, k2 from {database}.{table}"
+}
+
+// 获取表大小
+GET api/{database}/{table}/_count
+```
+
+#### Doris BE
+
+
+通过Thrift协议,直接对外提供数据的过滤、扫描和裁剪能力。
+
+```
+service TDorisExternalService {
+    // 初始化查询执行器
+TScanOpenResult open_scanner(1: TScanOpenParams params);
+
+// 流式batch获取数据,Apache Arrow数据格式
+    TScanBatchResult get_next(1: TScanNextBatchParams params);
+
+// 结束扫描
+    TScanCloseResult close_scanner(1: TScanCloseParams params);
+}
+```
+
+Thrift相关结构体定义可参考:
+
+https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
+
+ 
+
+### 实现DataStream
+
+继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定义DorisSourceFunction,初始化时,获取相关表的执行计划,获取对应的分区。
+
+重写run方法,循环从分区中读取数据。
+
+```java
+public void run(SourceContext sourceContext){
+       //循环读取各分区
+        for(PartitionDefinition partitions : dorisPartitions){
+            scalaValueReader = new ScalaValueReader(partitions, settings);
+            while (scalaValueReader.hasNext()){
+                Object next = scalaValueReader.next();
+                sourceContext.collect(next);
+            }
+        }
+}
+```
+
+
+
+### 实现Flink SQL on Doris
+
+参考了[Flink自定义Source&Sink](https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/sourceSinks.html) 和 Flink-jdbc-connector,实现了下面的效果,可以实现用Flink SQL直接操作Doris的表,包括读和写。
+
+#### 实现细节
+
+1.实现DynamicTableSourceFactory , DynamicTableSinkFactory 注册 doris connector
+
+2.自定义DynamicTableSource和DynamicTableSink 生成逻辑计划
+
+3.DorisRowDataInputFormat和DorisDynamicOutputFormat获取到逻辑计划后开始执行。
+
+![1616747472136](/images/table_connectors.svg)
+
+
+
+实现中最主要的是基于RichInputFormat和RichOutputFormat 定制的DorisRowDataInputFormat和DorisDynamicOutputFormat。
+
+在DorisRowDataInputFormat中,将获取到的dorisPartitions 在createInputSplits中 切分成多个分片,用于并行计算。
+
+```java
+public DorisTableInputSplit[] createInputSplits(int minNumSplits) {
+		List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
+		int splitNum = 0;
+		for (PartitionDefinition partition : dorisPartitions) {
+			dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
+		}
+		return dorisSplits.toArray(new DorisTableInputSplit[0]);
+}
+ 
+
+public RowData nextRecord(RowData reuse)  {
+		if (!hasNext) {
+            //已经读完数据,返回null
+			return null;
+		}
+		List next = (List)scalaValueReader.next();
+		GenericRowData genericRowData = new GenericRowData(next.size());
+		for(int i =0;i<next.size();i++){
+			genericRowData.setField(i, next.get(i));
+		}
+		//判断是否还有数据
+		hasNext = scalaValueReader.hasNext();
+		return genericRowData;
+}
+
+```
+
+
+
+在DorisRowDataOutputFormat中,通过streamload的方式向doris中写数据。streamload程序参考org.apache.doris.plugin.audit.DorisStreamLoader
+
+```java
+public  void writeRecord(RowData row) throws IOException {
+       //streamload 默认分隔符 \t
+        StringJoiner value = new StringJoiner("\t");
+        GenericRowData rowData = (GenericRowData) row;
+        for(int i = 0; i < row.getArity(); ++i) {
+            value.add(rowData.getField(i).toString());
+        }
+        //streamload 写数据
+        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());
+        System.out.println(loadResponse);
+}
+```
+
+
diff --git a/docs/zh-CN/design/grouping_sets_design.md b/docs/zh-CN/design/grouping_sets_design.md
new file mode 100644
index 0000000000..0c19094cff
--- /dev/null
+++ b/docs/zh-CN/design/grouping_sets_design.md
@@ -0,0 +1,517 @@
+---
+{
+    "title": "GROUPING SETS 设计文档",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# GROUPING SETS 设计文档
+
+## 1. GROUPING SETS 相关背景知识
+
+### 1.1 GROUPING SETS 子句
+
+GROUP BY GROUPING SETS 是对 GROUP BY 子句的扩展,它能够在一个 GROUP BY 子句中一次实现多个集合的分组。其结果等价于将多个相应 GROUP BY 子句进行 UNION 操作。
+
+特别地,一个空的子集意味着将所有的行聚集到一个分组。
+GROUP BY 子句是只含有一个元素的 GROUP BY GROUPING SETS 的特例。
+
+例如,GROUPING SETS 语句:
+
+```
+SELECT k1, k2, SUM( k3 ) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k1), (k2), ( ) );
+```
+
+其查询结果等价于:
+
+```
+SELECT k1, k2, SUM( k3 ) FROM t GROUP BY k1, k2
+UNION
+SELECT k1, null, SUM( k3 ) FROM t GROUP BY k1
+UNION
+SELECT null, k2, SUM( k3 ) FROM t GROUP BY k2
+UNION
+SELECT null, null, SUM( k3 ) FROM t
+```
+
+下面是一个实际数据的例子:
+
+```
+mysql> SELECT * FROM t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+8 rows in set (0.01 sec)
+
+mysql> SELECT k1, k2, SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+-----------+
+| k1   | k2   | sum(`k3`) |
++------+------+-----------+
+| b    | B    |         6 |
+| a    | B    |         4 |
+| a    | A    |         3 |
+| b    | A    |         5 |
+| NULL | B    |        10 |
+| NULL | A    |         8 |
+| a    | NULL |         7 |
+| b    | NULL |        11 |
+| NULL | NULL |        18 |
++------+------+-----------+
+9 rows in set (0.06 sec)
+```
+
+### 1.2 ROLLUP 子句
+
+ROLLUP 是对 GROUPING SETS 的扩展。
+
+```
+SELECT a, b,c, SUM( d ) FROM tab1 GROUP BY ROLLUP(a,b,c)
+```
+
+这个 ROLLUP 等价于下面的 GROUPING SETS:
+
+```
+GROUPING SETS (
+(a,b,c),
+( a, b ),
+( a),
+( )
+)
+```
+
+### 1.3 CUBE 子句
+
+CUBE 也是对 GROUPING SETS 的扩展。
+
+```
+CUBE ( e1, e2, e3, ... )
+```
+
+其含义是 GROUPING SETS 后面列表中的所有子集。
+
+例如,CUBE ( a, b, c ) 等价于下面的 GROUPING SETS:
+
+```
+GROUPING SETS (
+( a, b, c ),
+( a, b ),
+( a,    c ),
+( a       ),
+(    b, c ),
+(    b    ),
+(       c ),
+(         )
+)
+```
+
+### 1.4 GROUPING 和 GROUPING_ID 函数
+当我们没有统计某一列时,它的值显示为 NULL,这也可能是列本身就有 NULL 值,这就需要一种方法区分是没有统计还是值本来就是 NULL。为此引入 GROUPING 和 GROUPING_ID 函数。
+GROUPING(column:Column) 函数用于区分分组后的单个列是普通列和聚合列。如果是聚合列,则返回1,反之,则是0. GROUPING() 只能有一个参数列。
+
+GROUPING_ID(column1, column2) 则根据指定的column 顺序,否则根据聚合的时候给的集合的元素顺序,计算出一个列列表的 bitmap 值,一个列如果是聚合列为0,否则为1. GROUPING_ID()函数返回位向量的十进制值。
+比如 [0 1 0] ->2 从下列第三个查询可以看到这种对应关系
+
+例如,对于下面的表:
+
+```
+mysql> select * from t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+```
+
+grouping sets 的结果如下:
+
+```
+mysql> SELECT k1, k2, GROUPING(k1), GROUPING(k2), SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+----------------+----------------+-----------+
+| k1   | k2   | grouping(`k1`) | grouping(`k2`) | sum(`k3`) |
++------+------+----------------+----------------+-----------+
+| a    | A    |              0 |              0 |         3 |
+| a    | B    |              0 |              0 |         4 |
+| a    | NULL |              0 |              1 |         7 |
+| b    | A    |              0 |              0 |         5 |
+| b    | B    |              0 |              0 |         6 |
+| b    | NULL |              0 |              1 |        11 |
+| NULL | A    |              1 |              0 |         8 |
+| NULL | B    |              1 |              0 |        10 |
+| NULL | NULL |              1 |              1 |        18 |
++------+------+----------------+----------------+-----------+
+9 rows in set (0.02 sec)
+
+mysql> SELECT k1, k2, GROUPING_ID(k1,k2), SUM(k3) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) );
++------+------+-------------------------+-----------+
+| k1   | k2   | grouping_id(`k1`, `k2`) | sum(`k3`) |
++------+------+-------------------------+-----------+
+| a    | A    |                       0 |         3 |
+| a    | B    |                       0 |         4 |
+| a    | NULL |                       1 |         7 |
+| b    | A    |                       0 |         5 |
+| b    | B    |                       0 |         6 |
+| b    | NULL |                       1 |        11 |
+| NULL | A    |                       2 |         8 |
+| NULL | B    |                       2 |        10 |
+| NULL | NULL |                       3 |        18 |
++------+------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+
+mysql> SELECT k1, k2, grouping(k1), grouping(k2), GROUPING_ID(k1,k2), SUM(k4) FROM t GROUP BY GROUPING SETS ( (k1, k2), (k2), (k1), ( ) ) order by k1, k2;
++------+------+----------------+----------------+-------------------------+-----------+
+| k1   | k2   | grouping(`k1`) | grouping(`k2`) | grouping_id(`k1`, `k2`) | sum(`k4`) |
++------+------+----------------+----------------+-------------------------+-----------+
+| a    | A    |              0 |              0 |                       0 |         3 |
+| a    | B    |              0 |              0 |                       0 |         4 |
+| a    | NULL |              0 |              1 |                       1 |         7 |
+| b    | A    |              0 |              0 |                       0 |         5 |
+| b    | B    |              0 |              0 |                       0 |         6 |
+| b    | NULL |              0 |              1 |                       1 |        11 |
+| NULL | A    |              1 |              0 |                       2 |         8 |
+| NULL | B    |              1 |              0 |                       2 |        10 |
+| NULL | NULL |              1 |              1 |                       3 |        18 |
++------+------+----------------+----------------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+
+```
+
+### 1.5 GROUPING SETS 的组合与嵌套
+
+首先,一个 GROUP BY 子句本质上是一个 GROUPING SETS 的特例, 例如:
+
+```
+   GROUP BY a
+等同于
+   GROUP BY GROUPING SETS((a))
+同样地,
+   GROUP BY a,b,c
+等同于
+   GROUP BY GROUPING SETS((a,b,c))
+```
+
+同样的,CUBE 和 ROLLUP 也可以展开成 GROUPING SETS,因此 GROUP BY, CUBE, ROLLUP, GROUPING SETS 的各种组合和嵌套本质上就是 GROUPING SETS 的组合与嵌套。
+
+对于 GROUPING SETS 的嵌套,语义上等价于将嵌套内的语句直接写到外面。(参考:<https://www.brytlyt.com/documentation/data-manipulation-dml/grouping-sets-rollup-cube/>),其中写道:
+
+```
+The CUBE and ROLLUP constructs can be used either directly in the GROUP BY clause, or nested inside a GROUPING SETS clause. If one GROUPING SETS clause is nested inside another, the effect is the same as if all the elements of the inner clause had been written directly in the outer clause.
+```
+
+对于多个 GROUPING SETS 的组合列表,很多数据库认为是叉乘(cross product)的关系。
+
+例如:
+
+```
+GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))
+
+等同于:
+
+GROUP BY GROUPING SETS (
+(a, b, c, d), (a, b, c, e),
+(a, b, d),    (a, b, e),
+(a, c, d),    (a, c, e),
+(a, d),       (a, e)
+)
+```
+
+对于 GROUPING SETS 的组合与嵌套,各个数据库支持不太一样。例如 snowflake 不支持任何的组合和嵌套。
+(<https://docs.snowflake.net/manuals/sql-reference/constructs/group-by.html>)
+
+Oracle 既支持组合,也支持嵌套。
+(<https://docs.oracle.com/cd/B19306_01/server.102/b14223/aggreg.htm#i1006842>)
+
+Presto 支持组合,但不支持嵌套。
+(<https://prestodb.github.io/docs/current/sql/select.html>)
+
+## 2. 设计目标
+
+从语法上支持 GROUPING SETS, ROLLUP 和 CUBE。实现上述所述的1.1, 1.2, 1.3 1.4.
+
+对于1.6 GROUPING SETS 的组合与嵌套 先不实现。
+
+具体语法列出如下:
+
+### 2.1 GROUPING SETS 语法
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY GROUPING SETS ( groupSet [ , groupSet [ , ... ] ] )
+[ ... ]
+
+groupSet ::= { ( expr  [ , expr [ , ... ] ] )}
+
+<expr>
+各种表达式,包括列名.
+
+```
+
+### 2.2 ROLLUP 语法
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY ROLLUP ( expr  [ , expr [ , ... ] ] )
+[ ... ]
+
+<expr>
+各种表达式,包括列名.
+
+```
+
+### 2.3 CUBE 语法
+
+```
+SELECT ...
+FROM ...
+[ ... ]
+GROUP BY CUBE ( expr  [ , expr [ , ... ] ] )
+[ ... ]
+
+<expr>
+各种表达式,包括列名.
+
+```
+
+## 3. 实现方案
+
+### 3.1 整体思路
+
+既然 GROUPING SET 子句逻辑上等价于多个相应 GROUP BY 子句的 UNION,可以通过扩展输入行(此输入行已经是通过下推条件过滤和投影后的), 在此基础上进行一个单一的 GROUP BY 操作来达到目的。
+
+关键是怎样扩展输入行呢?下面举例说明:
+
+例如,对应下面的语句:
+
+```
+SELECT a, b FROM src GROUP BY a, b GROUPING SETS ((a, b), (a), (b), ());
+
+```
+
+假定 src 表的数据如下:
+
+```
+1, 2
+3, 4
+
+```
+
+根据 GROUPING SETS 子句给出的列表,可以将输入行扩展为下面的 8 行 (GROUPING SETS集合数 * 行数, 同时为每行生成对应的 全列的GROUPING_ID: 和其他grouping 函数的值
+
+```
+1, 2       (GROUPING_ID: a, b -> 00->0)
+1, null    (GUPING_ID: a, null -> 01 -> 1)
+null, 2    (GROUPING_ID: null, b -> 10 -> 2)
+null, null (GROUPING_ID: null, null -> 11 -> 3)
+
+3, 4       (GROUPING_ID: a, b -> 00 -> 0)
+3, null    (GROUPING_ID: a, null -> 01 -> 1)
+null, 4    (GROUPING_ID: null, b -> 10 -> 2)
+null, null (GROUPING_ID: null, null -> 11 -> 3)
+
+```
+
+然后,将上面的 8 行数据作为输入,对 a, b, GROUPING_ID 进行 GROUP BY 操作即可。
+
+### 3.2 具体例子验证说明
+
+假设有一个 t 表,包含如下列和数据:
+
+```
+mysql> select * from t;
++------+------+------+
+| k1   | k2   | k3   |
++------+------+------+
+| a    | A    |    1 |
+| a    | A    |    2 |
+| a    | B    |    1 |
+| a    | B    |    3 |
+| b    | A    |    1 |
+| b    | A    |    4 |
+| b    | B    |    1 |
+| b    | B    |    5 |
++------+------+------+
+8 rows in set (0.01 sec)
+
+```
+
+对于如下的查询:
+
+```
+SELECT k1, k2, GROUPING_ID(k1,k2), SUM(k3) FROM t GROUP BY GROUPING SETS ((k1, k2), (k1), (k2), ());
+
+```
+
+首先,对输入行进行扩展,每行数据扩展成 4 行 (GROUPING SETS子句的集合数目),同时增加 GROUPING_ID() 列 :
+
+例如 a, A, 1 扩展后变成下面的 4 行:
+
+```
++------+------+------+-------------------------+
+| k1   | k2   | k3   | GROUPING_ID(`k1`, `k2`) |
++------+------+------+-------------------------+
+| a    | A    |    1 |                       0 |
+| a    | NULL |    1 |                       1 |
+| NULL | A    |    1 |                       2 |
+| NULL | NULL |    1 |                       3 |
++------+------+------+-------------------------+
+
+```
+
+最终, 全部扩展后的输入行如下(总共 32 行):
+
+```
++------+------+------+-------------------------+
+| k1   | k2   | k3   | GROUPING_ID(`k1`, `k2`) |
++------+------+------+-------------------------+
+| a    | A    |    1 |                       0 |
+| a    | A    |    2 |                       0 |
+| a    | B    |    1 |                       0 |
+| a    | B    |    3 |                       0 |
+| b    | A    |    1 |                       0 |
+| b    | A    |    4 |                       0 |
+| b    | B    |    1 |                       0 |
+| b    | B    |    5 |                       0 |
+| a    | NULL |    1 |                       1 |
+| a    | NULL |    1 |                       1 |
+| a    | NULL |    2 |                       1 |
+| a    | NULL |    3 |                       1 |
+| b    | NULL |    1 |                       1 |
+| b    | NULL |    1 |                       1 |
+| b    | NULL |    4 |                       1 |
+| b    | NULL |    5 |                       1 |
+| NULL | A    |    1 |                       2 |
+| NULL | A    |    1 |                       2 |
+| NULL | A    |    2 |                       2 |
+| NULL | A    |    4 |                       2 |
+| NULL | B    |    1 |                       2 |
+| NULL | B    |    1 |                       2 |
+| NULL | B    |    3 |                       2 |
+| NULL | B    |    5 |                       2 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    1 |                       3 |
+| NULL | NULL |    2 |                       3 |
+| NULL | NULL |    3 |                       3 |
+| NULL | NULL |    4 |                       3 |
+| NULL | NULL |    5 |                       3 |
++------+------+------+-------------------------+
+32 rows in set.
+
+```
+
+现在对k1, k2, GROUPING_ID(`k1`, `k2`) 进行 GROUP BY:
+
+```
++------+------+-------------------------+-----------+
+| k1   | k2   | grouping_id(`k1`, `k2`) | sum(`k3`) |
++------+------+-------------------------+-----------+
+| a    | A    |                       0 |         3 |
+| a    | B    |                       0 |         4 |
+| a    | NULL |                       1 |         7 |
+| b    | A    |                       0 |         5 |
+| b    | B    |                       0 |         6 |
+| b    | NULL |                       1 |        11 |
+| NULL | A    |                       2 |         8 |
+| NULL | B    |                       2 |        10 |
+| NULL | NULL |                       3 |        18 |
++------+------+-------------------------+-----------+
+9 rows in set (0.02 sec)
+
+```
+
+可以看到,其结果与对 GROUPING SETS 子句后每个子集进行 GROUP BY 后再进行 UNION 的结果一致。
+
+```
+select k1, k2, sum(k3) from t group by k1, k2
+UNION ALL
+select NULL, k2, sum(k3) from t group by k2
+UNION ALL
+select k1, NULL, sum(k3) from t group by k1
+UNION ALL
+select NULL, NULL, sum(k3) from t;
+
++------+------+-----------+
+| k1   | k2   | sum(`k3`) |
++------+------+-----------+
+| b    | B    |         6 |
+| b    | A    |         5 |
+| a    | A    |         3 |
+| a    | B    |         4 |
+| a    | NULL |         7 |
+| b    | NULL |        11 |
+| NULL | B    |        10 |
+| NULL | A    |         8 |
+| NULL | NULL |        18 |
++------+------+-----------+
+9 rows in set (0.06 sec)
+
+```
+
+### 3.3 FE 规划阶段
+
+#### 3.3.1 主要任务
+
+1. 引入 GroupByClause 类,封装 Group By 相关信息,替换原有的 groupingExprs.
+2. 增加 Grouping Sets, Cube 和 RollUp 的语法支持和语法检查、错误处理和错误信息;
+3. 在 SelectStmt 类中增加 GroupByClause 成员;
+4. 引入 GroupingFunctionCallExpr 类,封装grouping 和grouping_id 函数调用
+5. 引入 VirtualSlot 类,封装grouping,grouping_id  生成的虚拟列和实际列的对应关系
+6. 增加虚拟列 GROUPING_ID 和其他grouping,grouping_id 函数对应的虚拟列,并将此列加入到原有的 groupingExprs 表达式列表中;
+7. 增加一个 PlanNode,考虑更通用的功能,命名为 RepeatNode。对于 GroupingSets 的聚合,在执行计划中插入 RepeatNode。
+
+#### 3.3.2 Tuple
+
+在 GroupByClause 类中为了将 GROUPING_ID 加到 groupingExprs 表达式列表中,需要创建 virtual SlotRef, 相应的,需要对这个 slot 创建一个 tuple, 叫 GROUPING_ID Tuple。
+
+对于 RepeatNode 这个执行计划,其输入是子节点的所有 tuple, 输出的 tuple 除了 repeat 子节点的数据外,还需要填写 GROUPING_ID 和其他grouping,grouping_id 对应的虚拟列,因此。
+
+
+### 3.4 BE 查询执行阶段
+
+主要任务:
+
+1. 通过 RepeatNode 的执行类,增加扩展输入行的逻辑,其功能是在聚合之前将原有数据进行 repeat:对每行增加一列 GROUPING_ID, 然后按照 GroupingSets 中的集合数进行 repeat,并对对应列置为 null。根据grouping list设置新增虚拟列的值
+2. 实现 grouping_id() 和grouping() 函数。
+
+
+
+
diff --git a/docs/zh-CN/design/metadata-design.md b/docs/zh-CN/design/metadata-design.md
new file mode 100644
index 0000000000..a1cdd29ee2
--- /dev/null
+++ b/docs/zh-CN/design/metadata-design.md
@@ -0,0 +1,126 @@
+---
+{
+    "title": "元数据设计文档",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 元数据设计文档
+
+## 名词解释
+
+* FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+* bdbje:[Oracle Berkeley DB Java Edition](http://www.oracle.com/technetwork/database/berkeleydb/overview/index-093405.html)。在 Doris 中,我们使用 bdbje 完成元数据操作日志的持久化、FE 高可用等功能。
+
+## 整体架构
+![](/images/palo_architecture.jpg)
+
+如上图,Doris 的整体架构分为两层。多个 FE 组成第一层,提供 FE 的横向扩展和高可用。多个 BE 组成第二层,负责数据存储与管理。本文主要介绍 FE 这一层中,元数据的设计与实现方式。
+
+1. FE 节点分为 follower 和 observer 两类。各个 FE 之间,通过 bdbje([BerkeleyDB Java Edition](http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/overview/index-093405.html))进行 leader 选举,数据同步等工作。
+
+2. follower 节点通过选举,其中一个 follower 成为 leader 节点,负责元数据的写入操作。当 leader 节点宕机后,其他 follower 节点会重新选举出一个 leader,保证服务的高可用。
+
+3. observer 节点仅从 leader 节点进行元数据同步,不参与选举。可以横向扩展以提供元数据的读服务的扩展性。
+
+> 注:follower 和 observer 对应 bdbje 中的概念为 replica 和 observer。下文可能会同时使用两种名称。
+
+## 元数据结构
+
+Doris 的元数据是全内存的。每个 FE 内存中,都维护一个完整的元数据镜像。在百度内部,一个包含2500张表,100万个分片(300万副本)的集群,元数据在内存中仅占用约 2GB。(当然,查询所使用的中间对象、各种作业信息等内存开销,需要根据实际情况估算。但总体依然维持在一个较低的内存开销范围内。)
+
+同时,元数据在内存中整体采用树状的层级结构存储,并且通过添加辅助结构,能够快速访问各个层级的元数据信息。
+
+下图是 Doris 元信息所存储的内容。
+
+![](/images/metadata_contents.png)
+
+如上图,Doris 的元数据主要存储4类数据:
+
+1. 用户数据信息。包括数据库、表的 Schema、分片信息等。
+2. 各类作业信息。如导入作业,Clone 作业、SchemaChange 作业等。
+3. 用户及权限信息。
+4. 集群及节点信息。
+
+## 数据流
+
+![](/images/metadata_stream.png)
+
+元数据的数据流具体过程如下:
+
+1. 只有 leader FE 可以对元数据进行写操作。写操作在修改 leader 的内存后,会序列化为一条log,按照 key-value 的形式写入 bdbje。其中 key 为连续的整型,作为 log id,value 即为序列化后的操作日志。
+
+2. 日志写入 bdbje 后,bdbje 会根据策略(写多数/全写),将日志复制到其他 non-leader 的 FE 节点。non-leader FE 节点通过对日志回放,修改自身的元数据内存镜像,完成与 leader 节点的元数据同步。
+
+3. leader 节点的日志条数达到阈值后(默认 10w 条),会启动 checkpoint 线程。checkpoint 会读取已有的 image 文件,和其之后的日志,重新在内存中回放出一份新的元数据镜像副本。然后将该副本写入到磁盘,形成一个新的 image。之所以是重新生成一份镜像副本,而不是将已有镜像写成 image,主要是考虑写 image 加读锁期间,会阻塞写操作。所以每次 checkpoint 会占用双倍内存空间。
+
+4. image 文件生成后,leader 节点会通知其他 non-leader 节点新的 image 已生成。non-leader 主动通过 http 拉取最新的 image 文件,来更换本地的旧文件。
+
+5. bdbje 中的日志,在 image 做完后,会定期删除旧的日志。
+
+## 实现细节
+
+### 元数据目录
+
+1. 元数据目录通过 FE 的配置项 `meta_dir` 指定。
+
+2. `bdb/` 目录下为 bdbje 的数据存放目录。
+
+3. `image/` 目录下为 image 文件的存放目录。
+
+	* 	`image.[logid]` 是最新的 image 文件。后缀 `logid` 表明 image 所包含的最后一条日志的 id。
+	*  `image.ckpt` 是正在写入的 image 文件,如果写入成功,会重命名为 `image.[logid]`,并替换掉旧的 image 文件。
+	*  `VERSION` 文件中记录着 `cluster_id`。`cluster_id` 唯一标识一个 Doris 集群。是在 leader 第一次启动时随机生成的一个 32 位整型。也可以通过 fe 配置项 `cluster_id` 来指定一个 cluster id。
+	*  `ROLE` 文件中记录的 FE 自身的角色。只有 `FOLLOWER` 和 `OBSERVER` 两种。其中 `FOLLOWER` 表示 FE 为一个可选举的节点。(注意:即使是 leader 节点,其角色也为 `FOLLOWER`)
+
+### 启动流程
+
+1. FE 第一次启动,如果启动脚本不加任何参数,则会尝试以 leader 的身份启动。在 FE 启动日志中会最终看到 `transfer from UNKNOWN to MASTER`。
+
+2. FE 第一次启动,如果启动脚本中指定了 `-helper` 参数,并且指向了正确的 leader FE 节点,那么该 FE 首先会通过 http 向 leader 节点询问自身的角色(即 ROLE)和 cluster_id。然后拉取最新的 image 文件。读取 image 文件,生成元数据镜像后,启动 bdbje,开始进行 bdbje 日志同步。同步完成后,开始回放 bdbje 中,image 文件之后的日志,完成最终的元数据镜像生成。
+
+	> 注1:使用 `-helper` 参数启动时,需要首先通过 mysql 命令,通过 leader 来添加该 FE,否则,启动时会报错。
+	
+	> 注2:`-helper` 可以指向任何一个 follower 节点,即使它不是 leader。
+	
+	> 注2:bdbje 在同步日志过程中,fe 日志会显示 `xxx detached`, 此时正在进行日志拉取,属于正常现象。
+
+3. FE 非第一次启动,如果启动脚本不加任何参数,则会根据本地存储的 ROLE 信息,来确定自己的身份。同时根据本地 bdbje 中存储的集群信息,获取 leader 的信息。然后读取本地的 image 文件,以及 bdbje 中的日志,完成元数据镜像生成。(如果本地 ROLE 中记录的角色和 bdbje 中记录的不一致,则会报错。)
+
+4. FE 非第一次启动,且启动脚本中指定了 `-helper` 参数。则和第一次启动的流程一样,也会先去询问 leader 角色。但是会和自身存储的 ROLE 进行比较。如果不一致,则会报错。
+
+#### 元数据读写与同步
+
+1. 用户可以使用 mysql 连接任意一个 FE 节点进行元数据的读写访问。如果连接的是 non-leader 节点,则该节点会将写操作转发给 leader 节点。leader 写成功后,会返回一个 leader 当前最新的 log id。之后,non-leader 节点会等待自身回放的 log id 大于回传的 log id 后,才将命令成功的消息返回给客户端。这种方式保证了任意 FE 节点的 Read-Your-Write 语义。
+
+	> 注:一些非写操作,也会转发给 leader 执行。比如 `SHOW LOAD` 操作。因为这些命令通常需要读取一些作业的中间状态,而这些中间状态是不写 bdbje 的,因此 non-leader 节点的内存中,是没有这些中间状态的。(FE 之间的元数据同步完全依赖 bdbje 的日志回放,如果一个元数据修改操作不写 bdbje 日志,则在其他 non-leader 节点中是看不到该操作修改后的结果的。)
+
+2. leader 节点会启动一个 TimePrinter 线程。该线程会定期向 bdbje 中写入一个当前时间的 key-value 条目。其余 non-leader 节点通过回放这条日志,读取日志中记录的时间,和本地时间进行比较,如果发现和本地时间的落后大于指定的阈值(配置项:`meta_delay_toleration_second`。写入间隔为该配置项的一半),则该节点会处于**不可读**的状态。此机制解决了 non-leader 节点在长时间和 leader 失联后,仍然提供过期的元数据服务的问题。
+
+3. 各个 FE 的元数据只保证最终一致性。正常情况下,不一致的窗口期仅为毫秒级。我们保证同一 session 中,元数据访问的单调一致性。但是如果同一 client 连接不同 FE,则可能出现元数据回退的现象。(但对于批量更新系统,该问题影响很小。)
+
+### 宕机恢复
+
+1. leader 节点宕机后,其余 follower 会立即选举出一个新的 leader 节点提供服务。
+2. 当多数 follower 节点宕机时,元数据不可写入。当元数据处于不可写入状态下,如果这时发生写操作请求,目前的处理流程是 **FE 进程直接退出**。后续会优化这个逻辑,在不可写状态下,依然提供读服务。
+3. observer 节点宕机,不会影响任何其他节点的状态。也不会影响元数据在其他节点的读写。
diff --git a/docs/zh-CN/design/spark_load.md b/docs/zh-CN/design/spark_load.md
new file mode 100644
index 0000000000..654d415862
--- /dev/null
+++ b/docs/zh-CN/design/spark_load.md
@@ -0,0 +1,212 @@
+---
+{
+    "title": "Doris支持spark导入设计文档",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris支持spark导入设计文档
+
+## 背景
+
+Doris现在支持Broker load/routine load/stream load/mini batch load等多种导入方式。
+spark load主要用于解决初次迁移,大量数据迁移doris的场景,用于提升数据导入的速度。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+* Tablet: 一个palo table的水平分片称为tablet。
+* Dpp:Data preprocessing,数据预处理模块,通过外部计算资源(Hadoop、Spark)完成对导入数据预处理,包括转化、清洗、分区、排序和聚合等。
+
+## 设计
+
+### 目标
+
+Doris中现有的导入方式中,针对百G级别以上的数据的批量导入支持不是很好,功能上需要修改很多配置,而且可能无法完成导入,性能上会比较慢,并且由于没有读写分离,需要占用较多的cpu等资源。而这种大数据量导入会在用户迁移的时候遇到,所以需要实现基于spark集群的导入功能,利用spark集群的并发能力,完成导入时的ETL计算,排序、聚合等等,满足用户大数据量导入需求,降低用户导入时间和迁移成本。
+
+在Spark导入中,需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;同时,由于用户数据格式多种多样,需要支持包括csv、parquet、orc等多种格式的数据文件。
+
+### 实现方案
+
+在将spark导入的设计实现的时候,有必要讲一下现有的导入框架。现在有的导入框架,可以参考《Doris Broker导入实现解析》。
+
+#### 方案1
+
+参考现有的导入框架和原有适用于百度内部hadoop集群的hadoop导入方式的实现,为了最大程度复用现有的导入框架,降低开发的难度,整体的方案如下:
+
+用户的导入语句经过语法和语意分析之后,生成LoadStmt,LoadStmt中增加一个isSparkLoad标识字段,如果为true,就会创建出SparkLoadJob,跟BrokerLoadJob类似,会通过状态机机制,实现Job的执行,在PENDING,会创建SparkLoadPendingTask,然后在LOADING阶段还是创建LoadLoadingTask,进行数据导入。在BE中,复用现有的计划执行框架,执行导入计划。
+
+实现Spark导入主要需要考虑以下几点:
+
+##### 语法	
+	这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark.cluster_name
+        PROPERTIES
+        (
+        "spark.master" = "yarn",
+		"spark.executor.cores" = "5",
+		"spark.executor.memory" = "10g",
+		"yarn.resourcemanager.address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1",
+        );
+```
+其中spark.cluster_name为用户导入使用的Spark集群名,可以通过SET PROPERTY来设置,可参考原来Hadoop集群的设置。
+property中的Spark集群设置会覆盖spark.cluster_name中对应的内容。
+各个property的含义如下:
+- spark.master是表示spark集群部署模式,支持包括yarn/standalone/local/k8s,预计先实现yarn的支持,并且使用yarn-cluster模式(yarn-client模式一般用于交互式的场景)。
+- spark.executor.cores: executor的cpu个数
+- spark.executor.memory: executor的内存大小
+- yarn.resourcemanager.address:指定yarn的resourcemanager地址
+- max_filter_ratio:指定最大过滤比例阈值
+
+##### SparkLoadJob
+
+用户发送spark load语句,经过parse之后,会创建SparkLoadJob,
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | LoadLoadingTask         |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+上图为SparkLoadJob的执行流程。
+
+##### SparkLoadPendingTask
+SparkLoadPendingTask主要用来提交spark etl作业到spark集群中。由于spark支持不同部署模型(localhost, standalone, yarn, k8s),所以需要抽象一个通用的接口SparkEtlJob,实现SparkEtl的功能,主要接口包括:
+- 提交spark etl任务
+- 取消spark etl的任务
+- 获取spark etl任务状态的接口
+
+大体接口如下:
+```
+class SparkEtlJob {
+	// 提交spark etl作业
+	// 返回JobId
+	String submitJob(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelJob(String jobId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+private:
+	std::list<DataDescription> data_descriptions;
+};
+```
+可以实现不同的子类,来实现对不同集群部署模式的支持。可以实现SparkEtlJobForYarn用于支持yarn集群的spark导入作业。具体来说上述接口中JobId就是Yarn集群的appid,如何获取appid?一个方案是通过spark-submit客户端提交spark job,然后分析标准错误中的输出,通过文本匹配获取appid。
+
+这里需要参考hadoop dpp作业的经验,就是需要考虑任务运行可能因为数据量、集群队列等原因,会达到并发导入作业个数限制,导致后续任务提交失败,这块需要考虑一下任务堆积的问题。一个方案是可以单独设置spark load job并发数限制,并且针对每个用户提供一个并发数的限制,这样各个用户之间的作业可以不用相互干扰,提升用户体验。
+
+spark任务执行的事情,包括以下几个关键点:
+1. 类型转化(extraction/Transformation)
+
+	将源文件字段转成具体列类型(判断字段是否合法,进行函数计算等等)
+2. 函数计算(Transformation),包括negative计算
+	
+	完成用户指定的列函数的计算。函数列表:"strftime","time_format","alignment_timestamp","default_value","md5sum","replace_value","now","hll_hash","substitute"
+3. Columns from path的提取
+4. 进行where条件的过滤
+5. 进行分区和分桶
+6. 排序和预聚合
+
+	因为在OlapTableSink过程中会进行排序和聚合,逻辑上可以不需要进行排序和聚合,但是因为排序和预聚合可以提升在BE端执行导入的效率。**如果在spark etl作业中进行排序和聚合,那么在BE执行导入的时候可以省略这个步骤。**这块可以依据后续测试的情况进行调整。目前看,可以先在etl作业中进行排序。
+	还有一个需要考虑的就是如何支持bitmap类型中的全局字典,string类型的bitmap列需要依赖全局字典。
+	为了告诉下游etl作业是否已经完成已经完成排序和聚合,可以在作业完成的时候生成一个job.json的描述文件,里面包含如下属性:
+
+	```
+	{
+		"is_segment_file" : "false",
+		"is_sort" : "true",
+		"is_agg" : "true",
+	}
+	```
+	其中:
+		is_sort表示是否排序
+		is_agg表示是否聚合
+		is_segment_file表示是否生成的是segment文件
+
+7. 现在rollup数据的计算都是基于base表,需要考虑能够根据index之间的层级关系,优化rollup数据的生成。
+
+这里面相对比较复杂一点就是列的表达式计算的支持。
+
+最后,spark load作业完成之后,产出的文件存储格式可以支持csv、parquet、orc,从存储效率上来说,建议默认为parquet。
+
+##### LoadLoadingTask
+	
+LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLoadJob不一样的地址就是,经过SparkEtlTask处理过后的数据文件已经完成列映射、函数计算、负导入、过滤、聚合等操作,这个时候LoadLoadingTask就不用进行这些操作,只需要进行简单的列映射和类型转化。
+
+##### BE导入任务执行
+
+这块可以完全复用现有的导入框架,应该不需要做改动。
+
+#### 方案2
+
+方案1可以最大限度的复用现有的导入框架,能够快速实现支持大数据量导入的功能。但是存在以下问题,就是经过spark etl处理之后的数据其实已经按照tablet划分好了,但是现有的Broker导入框架还是会对流式读取的数据进行分区和bucket计算,然后经过序列化通过rpc发送到对应的目标BE的机器,有一次序列化和网络IO的开销。 方案2是在SparkEtlJob生成数据的时候,直接生成doris的存储格式Segment文件,然后三个副本需要通过类似clone机制的方式,通过add_rowset接口,进行文件的导入。这种方案具体不一样的地方如下:
+
+1. 需要在生成的文件中添加tabletid后缀
+2. 在SparkLoadPendingTask类中增加一个接口protected Map<long, Pair<String, Long>> getFilePathMap()用于返回tabletid和文件之间的映射关系,
+3. 在BE rpc服务中增加一个spark_push接口,实现拉取源端etl转化之后的文件到本地(可以通过broker读取),然后通过add_rowset接口完成数据的导入,类似克隆的逻辑
+4. 生成新的导入任务SparkLoadLoadingTask,该SparkLoadLoadingTask主要功能就是读取job.json文件,解析其中的属性并且,将属性作为rpc参数,调用spark_push接口,向tablet所在的后端BE发送导入请求,进行数据的导入。BE中spark_push根据is_segment_file来决定如何处理,如果为true,则直接下载segment文件,进行add rowset;如果为false,则走pusher逻辑,实现数据导入。
+
+该方案将segment文件的生成也统一放到了spark集群中进行,能够极大的降低doris集群的负载,效率应该会比较高。但是方案2需要依赖于将底层rowset和segment v2的接口打包成独立的so文件,并且通过spark调用该接口来将数据转化成segment文件。
+
+## 总结
+
+综合以上两种方案,第一种方案的改动量比较小,但是BE做了重复的工作。第二种方案可以参考原有的Hadoop导入框架。所以,计划分两步完成spark load的工作。
+
+第一步,按照方案2,实现通过Spark完成导入数据的分区排序聚合,生成parquet格式文件。然后走Hadoop pusher的流程由BE转化格式。
+
+第二步,封装segment写入的库,直接生成Doris底层的格式,并且增加一个rpc接口,实现类似clone的导入逻辑。


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org