You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/23 02:33:42 UTC

[flink-table-store] branch master updated: [FLINK-28129] Add documentation for rescale bucket

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new df78da0  [FLINK-28129] Add documentation for rescale bucket
df78da0 is described below

commit df78da0e428467af3153984199300f049c22878f
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Thu Jun 23 10:33:36 2022 +0800

    [FLINK-28129] Add documentation for rescale bucket
    
    This closes #167
---
 docs/content/docs/development/rescale-bucket.md | 141 ++++++++++++++++++++++++
 1 file changed, 141 insertions(+)

diff --git a/docs/content/docs/development/rescale-bucket.md b/docs/content/docs/development/rescale-bucket.md
new file mode 100644
index 0000000..5ef9562
--- /dev/null
+++ b/docs/content/docs/development/rescale-bucket.md
@@ -0,0 +1,141 @@
+---
+title: "Rescale Bucket"
+weight: 5
+type: docs
+aliases:
+- /development/rescale-bucket.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Rescale Bucket
+
+Since the number of total buckets dramatically influences the performance, Table Store allows users to 
+tune bucket numbers by `ALTER TABLE` command and reorganize data layout by `INSERT OVERWRITE` 
+without recreating the table/partition. When executing overwrite jobs, the framework will automatically 
+scan the data with the old bucket number and hash the record according to the current bucket number.
+
+## Rescale Overwrite
+```sql
+-- rescale number of total buckets
+ALTER TABLE table_identifier SET ('bucket' = '...')
+
+-- reorganize data layout of table/partition
+INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
+SELECT ... 
+FROM table_identifier
+[WHERE part_spec]
+``` 
+
+Please note that
+- `ALTER TABLE` only modifies the table's metadata and will **NOT** reorganize or reformat existing data. 
+  Reorganize exiting data must be achieved by `INSERT OVERWRITE`.
+- Rescale bucket number does not influence the read and running write jobs.
+- Once the bucket number is changed, any newly scheduled `INSERT INTO` jobs which write to without-reorganized 
+  existing table/partition will throw a `TableException` with message like 
+  ```text
+  Try to write table/partition ... with a new bucket num ..., 
+  but the previous bucket num is ... Please switch to batch mode, 
+  and perform INSERT OVERWRITE to rescale current data layout first.
+  ```
+- For partitioned table, it is possible to have different bucket number for different partitions. *E.g.*
+  ```sql
+  ALTER TABLE my_table SET ('bucket' = '4');
+  INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
+  SELECT * FROM ...;
+  
+  ALTER TABLE my_table SET ('bucket' = '8');
+  INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
+  SELECT * FROM ...;
+  ```
+- During overwrite period, make sure there are no other jobs writing the same table/partition.
+
+{{< hint info >}}
+__Note:__ For the table which enables log system(*e.g.* Kafka), please rescale the topic's partition as well to keep consistency.
+{{< /hint >}}
+
+## Use Case
+
+Rescale bucket helps to handle sudden spikes in throughput. Suppose there is a daily streaming ETL task to sync transaction data. The table's DDL and pipeline
+are listed as follows.
+
+```sql
+-- table DDL
+CREATE TABLE verified_orders (
+    trade_order_id BIGINT,
+    item_id BIGINT,
+    item_price DOUBLE
+    dt STRING
+    PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED 
+) PARTITIONED BY (dt)
+WITH (
+    'bucket' = '16'
+);
+
+-- streaming insert as bucket num = 16
+INSERT INTO verified_orders
+SELECT trade_order_id,
+       item_id,
+       item_price,
+       DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
+FROM raw_orders
+WHERE order_status = 'verified'
+```
+The pipeline has been running well for the past few weeks. However, the data volume has grown fast recently, 
+and the job's latency keeps increasing. To improve the data freshness, users can 
+- Suspend the streaming job with a savepoint ( see 
+  [Suspended State](https://nightlies.apache.org/flink/flink-docs-master/docs/internals/job_scheduling/) and 
+  [Stopping a Job Gracefully Creating a Final Savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/) )
+  ```bash
+  $ ./bin/flink stop \
+        --savepointPath /tmp/flink-savepoints \
+        $JOB_ID
+   ```
+- Increase the bucket number
+  ```sql
+  -- scaling out
+  ALTER TABLE verified_orders SET ('bucket' = '32')
+  ```
+- Switch to the batch mode and overwrite the current partition(s) to which the streaming job is writing
+  ```sql
+  -- suppose today is 2022-06-22
+  -- case 1: there is no late event which updates the historical partitions, thus overwrite today's partition is enough
+  INSERT OVERWRITE verified_orders PARTITION (dt = '2022-06-22')
+  SELECT trade_order_id,
+         item_id,
+         item_price
+  FROM verified_orders
+  WHERE dt = '2022-06-22' AND order_status = 'verified'
+  
+  -- case 2: there are late events updating the historical partitions, but the range does not exceed 3 days
+  INSERT OVERWRITE verified_orders
+  SELECT trade_order_id,
+         item_id,
+         item_price,
+         dt
+  FROM verified_orders
+  WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22') AND order_status = 'verified'
+  ```
+- After overwrite job finished, restore the streaming job from the savepoint 
+( see [Starting a Job from a Savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/) )
+  ```bash
+  $ ./bin/flink run \
+      --fromSavepoint <savepointPath> \
+      ...
+   ```