You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2023/02/25 05:11:16 UTC

[GitHub] [rocketmq] TheR1sing3un created a discussion: Different implements about suporting S3 backend for tiered storage

GitHub user TheR1sing3un created a discussion: Different implements about suporting S3 backend for tiered storage

# Support S3 backend for TiredStorage

## Target

1. Implement Amazon S3 backend for TieredStorage.
2. Optimize upload and download efficiency.

## Implement
>
> Need to implement this two interface function

![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1677229843427-8cc030fa-c6dd-4796-ab53-341fa722d2cc.png#averageHue=%231f2123&clientId=u5f535491-b955-4&from=paste&height=277&id=uf4a177a0&name=image.png&originHeight=554&originWidth=2116&originalType=binary&ratio=2&rotation=0&showTitle=false&size=57403&status=done&style=none&taskId=u756dfbe4-3489-41d2-9f90-8688893f868&title=&width=1058)

### Elimated implement

One `Segment` maps to an object on s3.

#### Trouble

When we need to call `commit0` to write the stream into the Segment's `[position, position+length-1]`, since S3 uploaded objects don't support modification, nor do they support append writing like Alibaba Cloud OSS, we can only upload a new object to cover the old one if we want to update an object. Therefore, we need to get the data from `[0, position-1]`, concatenate it with the incoming stream, and then upload the entire object. This not only occupies bandwidth, but also occupies memory. Especially when the Segment is approaching the default value of 1G, committing the data will require downloading nearly 1G of data, concatenating it in memory, and then uploading 1G of data. Therefore, this approach has been eliminated.

---

### Implement-1

#### new configuration

| Configuration | Type | Unit | Default Value | Function |
| --- | --- | --- | --- | --- |
| s3Region | String |  |  | Region name about S3 service |
| s3Bucket | String |  |  | object's store bucket in S3 |
| s3AccessKey | String |  |  | IAM's accessKey |
| s3SecretAccessKey | String |  |  | IAM's Secret AccessKey |
| s3ChunkSize | long | bytes | `4 *1024* 1024` | `chunk` num in one `S3Segment` |
| readaheadChunkNum | int |  | 8 | readahead `chunk` num in each `read0` calling |
| s3ClientThreadNum | int |  | `tieredStoreGroupCommitSize`/`s3ChunkSize` | threads' num in S3Client's threadpool |


A segment is treated as a logical file and is divided into multiple physical files, or multiple physical objects, in the S3 view. We assume that each physical object has a default size of 4 MB, which is named `chunk`.

For ease of process representation, we assume that readaheadChunkNum is 2 in the following.

#### Process
>
> Create `S3SegmentFile` object。

This is done in the `S3Segment#S3Segment` constructor. The path of the logical segment file is constructed by concatenating the following components according to a set of rules: `clusterName/brokerName/topic/queue/type-baseOffset`. The path below this point is referred to as `baseDir`.
> Create real logical `segment`

That is, the `S3Segment#createFile()` method is called. Since no data has been written yet, we need to create the first `chunk` object and allocate 4MB of memory to cache the data for this chunk. We request the creation of an object from S3 in the format `baseDir/chunk-startOffset`, which means creating a `baseDir/chunk-0` object in S3 now.
> commit

The `Segment#commit0()` method is called.
We assume that wrting 2MB data this time.
![](https://cdn.nlark.com/yuque/0/2023/jpeg/22446956/1677233255373-1cae7002-2e0d-477a-b759-052d286b9441.jpeg)
The data is directlly writed into `chunk-0`, and uploaded to S3.
> read

That is, the `S3Segment#read0()` method is called. Suppose we are currently reading 1MB of data with `position = 0 and length = 1024`. Then it directly hits in the local `chunk-0` buffer and returns.
> commit

Suppose this time we write `position= 2048, length= 12 * 1024` data, that is, submit 12MB of data from 2MB position. 
![](https://cdn.nlark.com/yuque/0/2023/jpeg/22446956/1677233908540-ac9b8eec-3cb0-4e96-8eea-d81f5182ab92.jpeg)
At this point, the first 2MB of chunk-0 is cached locally, so we can directly concatenate the first 2MB of `chunk-0` with the first 2MB of the stream to form a complete `chunk-0`. Next, we correctly locate the first 2MB of `chunk-4096`, `chunk-8192`, and `chunk-12288`, and then upload them to S3. For the case of multiple chunks uploading at the same time, we use asynchronous/thread pool to upload them. If some chunks fail to upload, they are cached and then retried in the background asynchronously. If they fail multiple times, appropriate logical processing is performed.
> read

After the above commit, only the first 2MB of `chunk-12288` is cached locally. Now, we read 4096 bytes of data starting from `position = 2048`, which means reading the second half of `chunk-0` and the first half of `chunk-4096`. Since we have enabled the pre-reading mechanism and the parameter is 2, we need to read two more chunks. Considering that we only read half of `chunk-4096`, we only need to read one more chunk, which is `chunk-8192`.
![](https://cdn.nlark.com/yuque/0/2023/jpeg/22446956/1677234407065-a8965fc7-f3d9-4d32-b583-4f98ea485df4.jpeg)
Then we read `chunk-0`, `chunk-4096`, and `chunk-8192` from S3. According to the pre-reading mechanism, we do not save `chunk-0` and only save `chunk-4096` and `chunk-8192` in memory.
![](https://cdn.nlark.com/yuque/0/2023/jpeg/22446956/1677234743263-325a8f2a-2958-45cb-912d-08843b827fb7.jpeg)
> read

Now, we read 4096 bytes of data starting from `position = 6144`, which means reading the second half of `chunk-4096` and the first half of `chunk-8192`. Since we have pre-loaded `chunk-4096` and `chunk-8192` into memory, we can directly return the data without reading from S3.
> `Segment` is full

At this point, we can asynchronously trigger an `uploadPartCopy` operation to consolidate all the `chunks` into a single large `segment` object, and record the basic information of the `segment` in the object metadata. The object path is `clusterName/brokerName/topic/queue/type-baseOffset-seg`. After the copy operation is successful, we can asynchronously delete the parent path of the chunks.
> restart with `segment`

Now we concatenate the path `clusterName/brokerName/topic/queue/type-baseOffset-seg` and check whether the object exists. If it exists, it means it is the already organized large `Segment` object, then we record the corresponding metadata locally, and `read0()` can directly read the object based on the offset. Next, we check if there is an object under `.../type-baseOffset`. If it exists, it means the asynchronous deletion has not been successful, so we can re-attempt asynchronous deletion.
> restart with unexist `segment`

If the path `.../type-baseOffset-seg` does not exist, it may be due to failed consolidation or the current `segment` has not been written to capacity. In this case, we can list all the chunk files under the path and then determine if the segment has been fully written (this can be checked by adding an interface that is called during recovery). If the `segment` has been fully written, we can consolidate the `chunks` asynchronously and then delete them. If the `segment` has not been fully written, we can simply recover the latest `chunk` by caching it.

#### Advantages and disadvantages
>
> Advantages

1. The pre-fetching mechanism and the caching of incomplete chunks can help reduce network requests.
2. Optimal use case for this design is sequential read access, which fully utilizes the prefetching mechanism.

> Disadvantages

1.  `chunk` caches can lead to excessive memory usage. Suppose that 1000 queues, even if only one `chunk` is cached for one queue, can reach 4GB of memory usage.


### Implment-2

To solve memory buffer ocupying problem, we can use a singleton `bufferPool` with limited buffer size to cache `chunk`.

#### Advantages and disadvantages
>
> Advantages

1. Prevent OOM problem
2. Suitable for cases where the total number of queues is small

> Disadvantages

1. Not suitable for scenarios with a large number of queues, as there will be fierce cache competition between each queue, resulting in each queue actually pre-reading fewer or no `chunks`, which fails to achieve the desired effect of pre-reading.

### Implment-3

Instead of using the aforementioned chunking method, we can directly use S3's `multiPartUpload` to upload files in parts. In S3, a physical object corresponds to a `Segment`. When creating a `Segment`, start a multipart upload task and persist the upload task ID in S3 also. Every time `commit0()` is called, a `part` is created for uploading, and a `partID` is generated for represent this upload `part` and need to be persist to S3. When the entire `Segment` is full, the multipart upload is completed. 

Fault recovery:
At this point, we can retrieve the current maximum upload number based on the persistent upload ID. The next time `commit0()` is called, uploading can start from the number immediately following the maximum upload number.

#### Advantages and disadvantages
>
> Advantages

1. The implementation is simple.
2. This approach is suitable for cold data backup scenarios where a large amount of data is written once, and read requests are infrequent and dispersed. This is because it avoids prolonged read failures caused by invisibility during the multipart upload process.

> Disadvantages

1. It is not possible to access the object until the entire Segment has been successfully uploaded. In other words, the data in the `Segment` cannot be read while the Segment is being uploaded.

### Plan

1.  Implement `S3Segment` and corresponding methods to complete the basic storage requirements.
2. Add more metrics about `S3Segment`
3. Optimized based on the actual scenario


GitHub link: https://github.com/apache/rocketmq/discussions/6176

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org


[GitHub] [rocketmq] ShadowySpirits added a comment to the discussion: Different implements about suporting S3 backend for tiered storage

Posted by GitBox <gi...@apache.org>.
GitHub user ShadowySpirits added a comment to the discussion: Different implements about suporting S3 backend for tiered storage

We should not add the pre-fetching mechanism in the backend service provider. Similar to the file system in OS, the underlying interface reads files according to the specified position and length, and the OS maintains the page cache, which is a read-ahead cache in tiered storage (see [TieredMessageFetcher](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java) for more details).

If you want to batch upload, a shared upload buffer is okay. But when the messages are uploaded to s3, the data in the buffer should be cleaned immediately because we do not expect to access hot data from tiered storage.

GitHub link: https://github.com/apache/rocketmq/discussions/6176#discussioncomment-5109325

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org