You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/28 11:55:42 UTC

[GitHub] [incubator-doris] carlvinhust2012 commented on a diff in pull request #9297: [fix][doc]add design doc

carlvinhust2012 commented on code in PR #9297:
URL: https://github.com/apache/incubator-doris/pull/9297#discussion_r860769085


##########
docs/en/design/Flink doris connector Design.md:
##########
@@ -0,0 +1,259 @@
+---
+{
+    "title": "Flink doris connector Design",
+    "language": "en"
+}
+
+
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under 
+
+# Flink doris connector Design
+
+
+
+First of all, thanks to the author of the community Spark Doris Connector
+
+From the perspective of Doris, by introducing its data into Flink, Flink can use a series of rich ecological products, which broadens the imagination of the product and also makes it possible to query Doris and other data sources jointly.
+
+Starting from our business architecture and business needs, we chose Flink as part of our architecture, the ETL and real-time computing framework for data. The community currently supports Spark doris connector, so we designed and developed Flink doris Connector with reference to Spark doris connector.
+
+##Technical Choice
+
+When the model was originally selected, it was the same as the Spark Doris connector, so we started to consider the JDBC method, but, as described in the Spark doris connector article, this method has advantages, but the disadvantages are more obvious. Later, we read and tested the Spark code and decided to implement it on the shoulders of giants (note: copy the code and modify it directly).
+
+The following content is from the Spark Doris Connector blog, directly copied
+
+```
+Therefore, we developed a new data source Spark-Doris-Connector for Doris. Under this scheme, Doris can publish Doris data and distribute it to Spark. The Spark driver accesses Doris's FE to obtain the Doris table architecture and basic data distribution. After that, according to this data distribution, the data query task is reasonably allocated to the executors. Finally, Spark's execution program accesses different BEs for querying. Greatly improve query efficiency
+```
+
+## 1. Instructions
+
+Compile and generate doris-flink-1.0.0-SNAPSHOT.jar in the extension/flink-doris-connector/ directory of the Doris code base, add this jar package to the ClausPath of flink, and then you can use Flink-on -Doris function
+
+## 2. how to use
+
+Compile and generate doris-flink-1.0.0-SNAPSHOT.jar in the extension/flink-doris-connector/ directory of the Doris code library, add this jar package to the ClassPath of flink, and then use the Flink-on-Doris function
+
+#### 2.1 SQL way
+
+Support function:
+
+1. Supports reading data in Doris data warehouse tables through Flink SQL to Flink for calculations
+2. Support inserting data into the corresponding table of the data warehouse through Flink SQL. The back-end implementation is to communicate directly with BE through Stream Load to complete the data insertion operation
+3. You can use Flink to associate non-Doris external data source tables for association analysis
+
+example:
+
+
+
+```java
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation01 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+        tEnv.executeSql(
+                "CREATE TABLE test_aggregation02 (" +
+                        "user_id STRING," +
+                        "user_city STRING," +
+                        "age INT," +
+                        "last_visit_date STRING" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = 'doris01:8030',\n" +
+                        "  'table.identifier' = 'demo.test_aggregation_01',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+
+        tEnv.executeSql("INSERT INTO test_aggregation02 select * from test_aggregation01");
+        tEnv.executeSql("select count(1) from test_aggregation01");
+```
+
+#### 2.2 DataStream way:
+
+```java
+DorisOptions.Builder options = DorisOptions.builder()
+                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+                .setUsername("$YOUR_DORIS_USERNAME")
+                .setPassword("$YOUR_DORIS_PASSWORD")
+                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
+env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
+```
+
+## 3. Applicable scene
+
+![1616987965864](/images/Flink-doris-connector.png)
+
+#### 3.1. Use Flink to perform joint analysis on data in Doris and other data sources
+
+Many business departments place their data on different storage systems, such as some online analysis and report data in Doris, some structured retrieval data in Elasticsearch, and some data used for transaction processing in MySQL, and so on. It is often necessary to analyze the business across multiple storage sources. After connecting Flink and Doris through the Flink Doris connector, companies can directly use Flink to perform joint query calculations on the data in Doris and multiple external data sources.
+
+#### 3.2 Real-time data access
+
+Before Flink Doris Connector: For business irregular data, it is usually necessary to perform standardized processing on messages, and write null value filtering into new topics, and then start regular loading to write Doris.
+
+![1616988281677](/images/Flink-doris-connector1.png)
+
+After Flink Doris Connector: flink reads kafka and writes doris directly.
+
+![1616988514873](/images/Flink-doris-connector2.png)
+
+## 4.Technical realization
+
+### 4.1 Architecture diagram
+
+![1616997396610](/images/Flink-doris-connector-architecture.png)
+
+### 4.2 Doris provides more external capabilities
+
+#### 4.2.1 Doris FE
+
+The interface for obtaining metadata information of internal tables, single-table query planning and some statistical information has been opened to the outside world.
+
+All Rest API interfaces require HttpBasic authentication. The user name and password are the user name and password for logging in to the database. Pay attention to the correct assignment of permissions.
+
+```
+// Get table schema meta information
+GET api/{database}/{table}/_schema
+
+// Get the query plan template for a single table
+POST api/{database}/{table}/_query_plan
+{
+"sql": "select k1, k2 from {database}.{table}"
+}
+
+// Get the table size
+GET api/{database}/{table}/_count
+```
+
+#### 4.2.2 Doris BE
+
+Through the Thrift protocol, data filtering, scanning and cropping capabilities are directly provided to the outside world.
+
+```
+service TDorisExternalService {
+     // Initialize the query executor
+TScanOpenResult open_scanner(1: TScanOpenParams params);
+
+// Streaming batch to get data, Apache Arrow data format
+     TScanBatchResult get_next(1: TScanNextBatchParams params);
+
+// end scan
+     TScanCloseResult close_scanner(1: TScanCloseParams params);
+}
+```
+
+For definitions of Thrift related structures, please refer to:
+
+https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
+
+### 4.3 Implement DataStream
+
+Inherit org.apache.flink.streaming.api.functions.source.RichSourceFunction and customize DorisSourceFunction. During initialization, get the execution plan of the related table and get the corresponding partition.
+
+Rewrite the run method to read data from the partition in a loop.
+
+```
+public void run(SourceContext sourceContext){
+       //Cycle through the partitions
+        for(PartitionDefinition partitions : dorisPartitions){
+            scalaValueReader = new ScalaValueReader(partitions, settings);
+            while (scalaValueReader.hasNext()){
+                Object next = scalaValueReader.next();
+                sourceContext.collect(next);
+            }
+        }
+}
+```
+
+### 4.4 Implement Flink SQL on Doris
+
+Refer to [Flink Custom Source&Sink](https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/sourceSinks.html) and Flink-jdbc-connector to implement the following As a result, Flink SQL can be used to directly manipulate Doris tables, including reading and writing.
+
+#### 4.4.1 Implementation details
+
+1. Realize DynamicTableSourceFactory and DynamicTableSinkFactory register doris connector
+2. Customize DynamicTableSource and DynamicTableSink to generate logical plans
+3. After DorisRowDataInputFormat and DorisDynamicOutputFormat obtain the logical plan, start execution
+
+![1616747472136](/images/table_connectors.svg)
+
+The most important implementation is DorisRowDataInputFormat and DorisDynamicOutputFormat customized based on RichInputFormat and RichOutputFormat.
+
+In DorisRowDataInputFormat, the obtained dorisPartitions are divided into multiple shards in createInputSplits for parallel computing.
+
+```java
+public DorisTableInputSplit[] createInputSplits(int minNumSplits) {
+		List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
+		int splitNum = 0;
+		for (PartitionDefinition partition : dorisPartitions) {
+			dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
+		}
+		return dorisSplits.toArray(new DorisTableInputSplit[0]);
+}
+
+public RowData nextRecord(RowData reuse)  {
+		if (!hasNext) {
+            //After reading the data, return null

Review Comment:
   It is recommended to adjust the indentation.



##########
docs/en/design/doris_storage_optimization.md:
##########
@@ -0,0 +1,235 @@
+---
+{
+    "title": "Doris Storage File Format Optimization",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Doris Storage File Format Optimization #
+
+## File format ##
+
+![](/images/segment_v2.png)
+<center>1. doris segment</center>
+
+Documents include:
+- The file starts with an 8-byte magic code to identify the file format and version
+- Data Region: Used to store data information for each column, where the data is loaded on demand by pages.
+- Index Region: Doris stores the index data of each column in Index Region, where the data is loaded according to column granularity, so the data information of the following column is stored separately.
+- Footer
+	- FileFooterPB: Metadata Information for Definition Files
+	- Checksum of 4 bytes of footer Pb content
+	- Four bytes FileFooterPB message length for reading FileFooterPB
+	- The 8 byte MAGIC CODE is stored in the last bit to facilitate the identification of file types in different scenarios.
+
+The data in the file is organized in the form of page, which is the basic unit of coding and compression. Current page types include the following:
+
+### DataPage ###
+
+Data Page is divided into two types: nullable and non-nullable data pages.
+
+Nullable's data page includes:
+```
+
+                 +----------------+
+                 |  value count   |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 | bitmap length  |
+                 |----------------|
+                 |  null bitmap   |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+non -zero data page32467;- 26500;- 229140;-

Review Comment:
   This line is different from the line 68 of 'zh-CN/design/doris_storage_optimization.md'



##########
docs/en/design/doris_storage_optimization.md:
##########
@@ -0,0 +1,235 @@
+---
+{
+    "title": "Doris Storage File Format Optimization",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Doris Storage File Format Optimization #
+
+## File format ##
+
+![](/images/segment_v2.png)
+<center>1. doris segment</center>
+
+Documents include:
+- The file starts with an 8-byte magic code to identify the file format and version
+- Data Region: Used to store data information for each column, where the data is loaded on demand by pages.
+- Index Region: Doris stores the index data of each column in Index Region, where the data is loaded according to column granularity, so the data information of the following column is stored separately.
+- Footer
+	- FileFooterPB: Metadata Information for Definition Files
+	- Checksum of 4 bytes of footer Pb content
+	- Four bytes FileFooterPB message length for reading FileFooterPB
+	- The 8 byte MAGIC CODE is stored in the last bit to facilitate the identification of file types in different scenarios.
+
+The data in the file is organized in the form of page, which is the basic unit of coding and compression. Current page types include the following:
+
+### DataPage ###
+
+Data Page is divided into two types: nullable and non-nullable data pages.
+
+Nullable's data page includes:
+```
+
+                 +----------------+
+                 |  value count   |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 | bitmap length  |
+                 |----------------|
+                 |  null bitmap   |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+non -zero data page32467;- 26500;- 229140;-
+
+```
+                 |----------------|
+                 |   value count  |
+                 |----------------|
+                 |  first row id  |
+                 |----------------|
+                 |     data       |
+                 |----------------|
+                 |    checksum    |
+                 +----------------+
+```
+
+The meanings of each field are as follows:
+
+- value count
+	- Represents the number of rows in a page
+- First row id
+	- Line number of the first line in page
+- bitmap length
+	- Represents the number of bytes in the next bitmap
+- null bitmap
+	- bitmap representing null information
+- Data
+	- Store data after encoding and compress
+	- You need to write in the header information of the data: is_compressed
+	- Various kinds of data encoded by different codes need to write some field information in the header information in order to achieve data parsing.
+	- TODO: Add header information for various encodings
+- Checksum
+	- Store page granularity checksum, including page header and subsequent actual data
+
+
+### Bloom Filter Pages ###
+
+For each bloom filter column, a page of the bloom filter is generated corresponding to the granularity of the page and saved in the bloom filter pages area.
+
+### Ordinal Index Page ###
+
+For each column, a sparse index of row numbers is established according to page granularity. The content is a pointer to the block (including offset and length) for the line number of the start line of the page
+
+### Short Key Index page ###
+
+We generate a sparse index of short key every N rows (configurable) with the contents of short key - > line number (ordinal)
+
+### Column's other indexes ###
+
+The format design supports the subsequent expansion of other index information, such as bitmap index, spatial index, etc. It only needs to write the required data to the existing column data, and add the corresponding metadata fields to FileFooterPB.
+
+### Metadata Definition ###
+SegmentFooterPB is defined as:
+
+```
+message ColumnPB {
+    required int32 unique_id = 1;   // The column id is used here, and the column name is not used
+    optional string name = 2;   // Column name,  when name equals __DORIS_DELETE_SIGN__, this column is a hidden delete column
+    required string type = 3;   // Column type
+    optional bool is_key = 4;   // Whether column is a primary key column
+    optional string aggregation = 5;    // Aggregate type
+    optional bool is_nullable = 6;      // Whether column is allowed to assgin null
+    optional bytes default_value = 7;   // Defalut value
+    optional int32 precision = 8;       // Precision of column
+    optional int32 frac = 9;
+    optional int32 length = 10;         // Length of column
+    optional int32 index_length = 11;   // Length of column index
+    optional bool is_bf_column = 12;    // Whether column has bloom filter index
+    optional bool has_bitmap_index = 15 [default=false];  // Whether column has bitmap index
+}
+
+// page offset
+message PagePointerPB {
+	required uint64 offset; // offset of page in segment file
+	required uint32 length; // length of page
+}
+
+message MetadataPairPB {
+  optional string key = 1;
+  optional bytes value = 2;
+}
+
+message ColumnMetaPB {
+	optional ColumnMessage encoding; // Encoding of column
+
+	optional PagePointerPB dict_page // Dictionary page
+	repeated PagePointerPB bloom_filter_pages; // Bloom filter pages
+	optional PagePointerPB ordinal_index_page; // Ordinal index page
+	optional PagePointerPB page_zone_map_page; // Page level of statistics index data
+
+	optional PagePointerPB bitmap_index_page; // Bitmap index page
+
+	optional uint64 data_footprint; // The size of the index in the column
+	optional uint64 index_footprint; // The size of the data in the column
+	optional uint64 raw_data_footprint; // Original column data size
+
+	optional CompressKind compress_kind; // Column compression type
+
+	optional ZoneMapPB column_zone_map; // Segment level of statistics index data
+	repeated MetadataPairPB column_meta_datas;
+}
+
+message SegmentFooterPB {
+	optional uint32 version = 2 [default = 1]; // For version compatibility and upgrade use
+	repeated ColumnPB schema = 5; // Schema of columns
+  optional uint64 num_values = 4; // Number of lines saved in the file

Review Comment:
   It is recommended to adjust the indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org