You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Yubiao Feng <yu...@streamnative.io.INVALID> on 2022/06/10 09:34:56 UTC

[DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Hi Pulsar community:

I open a pip to discuss "Batch writing ledger for transaction operation"

Proposal Link: https://github.com/apache/pulsar/issues/15370

## Motivation

Before reading the background, I suggest you read section “Transaction
Flow” of [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
)

### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p>
![MG3](
https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
)
In *Figure 1. Normal Flow vs. Transaction Flow*:
- The gray square boxes represent logical components.
- All the blue boxes represent logs. The logs are usually Managed ledger
- Each arrow represents the request flow or message flow. These operations
occur in sequence indicated by the numbers next to each arrow.
- The black arrows indicate those shared by transaction and normal flow.
- The blue arrows represent normal-message-specific flow.
- The orange arrows represent transaction-message-specific flow.
- The sections below are numbered to match the operations showed in the
diagram(differ from [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
))


#### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
[Acknowledgement State Machine](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u)
tells about the changes of the Acknowledge State and why we need persistent
“The Log which the Acknowledgement State is PENDING_ACK”.
#### 2.4a’ Mark messages is no longer useful with current subscription
Update `Cursor` to mark the messages as DELETED. So they can be deleted.
#### 3.2b Mark messages is no longer useful with current subscription
The implementation here is exactly the same as 2.4a’, except that the
execution is triggered later, after the Transaction has been committed.


### Analyze the performance cost of transaction
As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs.
Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
remaining orange arrows are the additional performance overhead of all
transactions.
In terms of whether or not each transaction is executed multiple times, we
can split the flow into two classes(Optimizing a process that is executed
multiple times will yield more benefits):
- Executed once each transaction: flow-1.x and flow-3.x
- Executed multiple times each transaction: flow-2.x

So optimizing the flow 2.x with a lot of execution is a good choice. Let's
split flow-2.x into two groups: those that cost more and those that cost
less:
- No disk written: flow-2.1 and fow-2.3
- Disk written: fow-2.1a, fow-2.3a, flow-2.4a

From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a,
flow-2.4a would bring the most benefits, and batch writes would be an
excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
both manipulations written into the transaction log, we can combine them in
one batch; 2.4a is the operation of writing pending ACK log, we combine
multiple 2.4a's into one batch for processing.
As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx),
these instructions are strictly sequential (guaranteed by the client):
- flow-1.x end before flow-2.x start
- flow-2.x end before flow-3.x start
- flow-3.1a end before flow-3.3a start

Therefore, the broker does not need to worry about the dependency of these
flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction
Log Batch too.

## Goal
Provide a mechanism for Transaction Log Store and Pending Ack Store: accept
multiple write requests, buffer all those records, and persist to a single
BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
- Allow users to specify control of the max size, max record of The Buffer.
- Allow users to specify control max delay time of The Write Request.
- Multiple raw data can be recovered from Batched Entry.
- Configurable “batched implementation” and “common implementation” switch.

## Approach
### Buffer requests and write Bookie
Create a new protobuf record called “Batched Transaction Data” with an
array inside. When receive a request, we put it in the array.

Request:
```
[Request 1]{ data, callback }
[Request 2]]{ data, callback }
…
…
[Request N]]{ data, callback }
```
Buffer:
```
[BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
```
Write Bookie:
```
LedgerHandle async write ( BatchedTransactionData to byteBuf )
LedgerHandle callback: ledgerId=1, entryId=1
```
Request-Callback:
```
Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
…
…
Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
```

### Delete BatchedTransactionMeta
[PIP 45](
https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level)
has supported batch index delete. So the Raw Data added to a batch can be
with different batch indexes but with the same ledger ID and entry ID.

Read:
```
[BatchedTransactionData]
```
After split:
```
{data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
{data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
…
{data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
```
Users can delete whole of the batched Entry:
```java
cursor.delete( Position {ledgerId = 1, entryId = 1} )
```
Users can also delete only part of the batched Entry:
```java
cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
```

## Changes

### Protocol Changes
New protobuf record to buffer requests.

BatchedTransactionMetadataEntry
```
message BatchedTransactionMetadataEntry{
  // Array for buffer transaction log data.
  repeated TransactionMetadataEntry transaction_log = 12;
}
```

BatchedPendingAckMetadataEntry
```
message BatchedPendingAckMetadataEntry{
  // Array for buffer pending ack data.
  repeated PendingAckMetadataEntry pending_ack_log=6;
}
```

Note: To ensure forward compatibility, we need to distinguish the old
TransactionMetadataEntry/PendingAckMetadataEntry data from the new
BatchedTransactionData data, and we add A magic number in front of the
bytes that proto serializes:
```
[Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
```
Read Entry:
```
                           /-- true --> [BatchedTransactionMetadataEntry]
[Entry] --> has Magic Num ?
                           \-- false --> [TransactionMetadataEntry]
```


### API Changes

BatchAddDataCallback
The Transaction Coordinator does not directly operate the Managed Ledger,
uses the Transaction Log Store to operate on Managed Ledger. The Managed
Ledger write API provides a callback class: AddEntryCallback, the same
Transaction Log Store that provides bulk writes, provides a callback class:
BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
why do we need BatchAddDataCallback </a>.

![WechatIMG7](
https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
)
Figure.BatchAddDataCallback in Write Flow

```java
interface BatchAddDataCallback {
    /**
     * Successed callback function for “add data asynchronously”
     *
     * @param posotion A Position is a pointer to a specific entry into the
managed ledger.
     * @param byteBuf The raw data which added.
     * @param batchIndex Raw data count in The whole Batched Entry.
     * @param batchSize The current raw data index in the batch.
     * @param ctx opaque context
     */
    void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
int batchSize, Object context);
    /**
     * Failure callback function for “add data asynchronously”
     *
     * @param ctx opaque context
     */
    void addFailed(ManagedLedgerException exception, Object ctx);
}
```

### Configuration Changes
Add the Batch threshold parameters to control the refresh frequency.

broker.conf
```
transactionLogBatchedWriteEnabled = false;
transactionLogBatchedWriteMaxRecords= 512;
transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionLogBatchedWriteMaxDelayInMillis= 1;

pendingAckBatchedWriteEnabled = false;
pendingAckBatchedWriteMaxRecords= 512;
pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
pendingAckBatchedWriteMaxDelayInMillis= 1;
```

### Compatibility
After the batch feature is enabled, users can only downgrade to the larger
than “first version that supports BatchedTransactionMeta reading” to
consume data. Data in a lower version broker cannot be parsed, resulting in
data loss. We also provide <a href="#librarySupports"> Library support for
Compatibility with older versions Broker</a>, If the user uses this library
on older version Broker<sup>[0]</sup>, all new data results can be
processed correctly and none of the data will be lost.

----
**[0]old version Broker**: Not less than 2.9.2 and 2.10

### Observability
When using the Batch feature, users will adjust the frequency of disk
brushing to achieve the optimal performance. We provide two observable
indicators for users' reference

```
BatchedDataStoreMXBeanImpl {
    /** The number of logs in each batch. **/
    Rate batchRecordCount;
    /** The size of each batch. **/
    Rate batchSizeCount;
}
```

## Test plan
The test should cover the following cases:

- The batch mechanism works abides by the total count, total size, and max
delay limitation.
- The returned position for writing data is correct.
- The managedCursor can delete and mark delete the BatchedTransactionMeta.
- Performance tests and compare before-after improvement.

## The appendix

### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
BatchAddDataCallback  </p>
After all produced messages and acknowledgements to all partitions are
committed or aborted, the TC writes the final COMMITTED or ABORTED
transaction status message to its transaction log, indicating that the
transaction is complete (shown as 3.3a in the diagram). At this point, all
the messages pertaining to the transaction in its transaction log can
safely be removed.
e.g. There are two transactions:
![截屏2022-06-10 11 56 49](
https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
)
Transaction Log Write:
```
transaction_1: start transaction
transaction_2: start transaction
transaction_1: add partition to tx
transaction_1: add subscription to tx
transaction_2: add partition to tx
transaction_1: commit
```
Bookie Write:
```
[Entry]{ BatchedTransactionData={LogRecordSize=6} }
```
Bookie Response:
```
{ledgerId=2, entryId=3}
```
Transaction Log callback:
```
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx}
```
The entry(2,3) actually has 6 transaction logs, transaction_1 relations to
[0,2,3,5] and transaction_2 relations to [1,4].
After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
needed because the transaction has already been completed, but now we could
not delete Entry(2,3), Because the logs [1,4] are still useful that
transaction_2 is not finished and we still need them for the recovery
operation.
The BatchIndex and BatchSize can clearly indicate the location of each
transaction log in the ledger. When the transaction log is no longer used,
Users can accurately delete it according to position and batchIndex.

### <p id="librarySupports">Library support for Compatibility with older
versions Broker</p>
In broker.conf we can configure the
[transactionMetadataStoreProviderClassName](
https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535)
to replace the implementation of TransactionLog, we can also configure the
[transactionPendingAckStoreProviderClassName](
https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549)
to replace the implementation of PendingAckStore,
We will provide a library containing the classes which can read batched
transaction logs and pending ack logs:

#### TransactionLogBatchEnryReadableImpl
```java
public class TransactionLogBatchEnryReadableImpl extends
MLTransactionLogImpl {
    /**
     * Different from the parent class, when this method reads an Entry, it
can identify
     * whether the Entry is transction log or batched transaction log. If
the Entry is a
    * transaction log, it maintains the same logic as the parent class. If
the transaction log
    * is batched, it will be split into transaction log and processed
according to the original
    * logic
     */
    @Override
    void replayAsync(TransactionLogReplayCallback
transactionLogReplayCallback);
}
```

#### PendingAckStoreBatchEntryReadableImpl
```java
public class PendingAckStoreBatchEntryReadableImpl extends
MLPendingAckStore {
    /**
     * Different from the parent class, when this method reads an Entry, it
can identify
     * whether the Entry is pending ack log or batched pending ack log. If
the Entry is a
    * pending ack log, it maintains the same logic as the parent class. If
the pending ack
    * log is batched, it will be split into pending ack log and processed
according to the
    * original logic
     */
    void replayAsync(pendingAckHandle, executorService);
}
```

How to use this library
1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
“org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
set transactionPendingAckStoreProviderClassName is
“org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
3. Restart broker.

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by PengHui Li <pe...@apache.org>.
+1

Penghui

On Fri, Jun 17, 2022 at 11:40 AM Yubiao Feng
<yu...@streamnative.io.invalid> wrote:

> Hi  Enrico
>
> > I am not sure I understand the part of making it configurable via a
> classname.
> I believe it is better to simply have a flag
> "transactionEnableBatchWrites".
> Otherwise the matrix of possible implementations will grow without limits.
>
> Good idea, I've modified the design and added a switch in the Configure
> Changes section. Could you take a look again.
>
> On Fri, Jun 10, 2022 at 7:14 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > I have read the PIP, and overall I agree with the design.
> > Good work !
> >
> > I am not sure I understand the part of making it configurable via a
> > classname.
> > I believe it is better to simply have a flag
> > "transactionEnableBatchWrites".
> > Otherwise the matrix of possible implementations will grow without
> limits.
> >
> > Enrico
> >
> > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> > <yu...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Batch writing ledger for transaction
> operation"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/15370
> > >
> > > ## Motivation
> > >
> > > Before reading the background, I suggest you read section “Transaction
> > > Flow” of [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > )
> > >
> > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> > </p>
> > > ![MG3](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > > )
> > > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > > - The gray square boxes represent logical components.
> > > - All the blue boxes represent logs. The logs are usually Managed
> ledger
> > > - Each arrow represents the request flow or message flow. These
> > operations
> > > occur in sequence indicated by the numbers next to each arrow.
> > > - The black arrows indicate those shared by transaction and normal
> flow.
> > > - The blue arrows represent normal-message-specific flow.
> > > - The orange arrows represent transaction-message-specific flow.
> > > - The sections below are numbered to match the operations showed in the
> > > diagram(differ from [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > ))
> > >
> > >
> > > #### 2.4a Write logs to ledger which Acknowledgement State is
> PENDING_ACK
> > > [Acknowledgement State Machine](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> > )
> > > tells about the changes of the Acknowledge State and why we need
> > persistent
> > > “The Log which the Acknowledgement State is PENDING_ACK”.
> > > #### 2.4a’ Mark messages is no longer useful with current subscription
> > > Update `Cursor` to mark the messages as DELETED. So they can be
> deleted.
> > > #### 3.2b Mark messages is no longer useful with current subscription
> > > The implementation here is exactly the same as 2.4a’, except that the
> > > execution is triggered later, after the Transaction has been committed.
> > >
> > >
> > > ### Analyze the performance cost of transaction
> > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> > vs.
> > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so
> the
> > > remaining orange arrows are the additional performance overhead of all
> > > transactions.
> > > In terms of whether or not each transaction is executed multiple times,
> > we
> > > can split the flow into two classes(Optimizing a process that is
> executed
> > > multiple times will yield more benefits):
> > > - Executed once each transaction: flow-1.x and flow-3.x
> > > - Executed multiple times each transaction: flow-2.x
> > >
> > > So optimizing the flow 2.x with a lot of execution is a good choice.
> > Let's
> > > split flow-2.x into two groups: those that cost more and those that
> cost
> > > less:
> > > - No disk written: flow-2.1 and fow-2.3
> > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> > >
> > > From the previous analysis, we found that optimizing flow-2.1a,
> > flow-2.3a,
> > > flow-2.4a would bring the most benefits, and batch writes would be an
> > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a
> are
> > > both manipulations written into the transaction log, we can combine
> them
> > in
> > > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > > multiple 2.4a's into one batch for processing.
> > > As we can see from “Transaction Flow” of [PIP-31: Transactional
> > Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ),
> > > these instructions are strictly sequential (guaranteed by the client):
> > > - flow-1.x end before flow-2.x start
> > > - flow-2.x end before flow-3.x start
> > > - flow-3.1a end before flow-3.3a start
> > >
> > > Therefore, the broker does not need to worry about the dependency of
> > these
> > > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> > Transaction
> > > Log Batch too.
> > >
> > > ## Goal
> > > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> > accept
> > > multiple write requests, buffer all those records, and persist to a
> > single
> > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > > - Allow users to specify control of the max size, max record of The
> > Buffer.
> > > - Allow users to specify control max delay time of The Write Request.
> > > - Multiple raw data can be recovered from Batched Entry.
> > > - Configurable “batched implementation” and “common implementation”
> > switch.
> > >
> > > ## Approach
> > > ### Buffer requests and write Bookie
> > > Create a new protobuf record called “Batched Transaction Data” with an
> > > array inside. When receive a request, we put it in the array.
> > >
> > > Request:
> > > ```
> > > [Request 1]{ data, callback }
> > > [Request 2]]{ data, callback }
> > > …
> > > …
> > > [Request N]]{ data, callback }
> > > ```
> > > Buffer:
> > > ```
> > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > > ```
> > > Write Bookie:
> > > ```
> > > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > > LedgerHandle callback: ledgerId=1, entryId=1
> > > ```
> > > Request-Callback:
> > > ```
> > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > …
> > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > >
> > > ### Delete BatchedTransactionMeta
> > > [PIP 45](
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> > )
> > > has supported batch index delete. So the Raw Data added to a batch can
> be
> > > with different batch indexes but with the same ledger ID and entry ID.
> > >
> > > Read:
> > > ```
> > > [BatchedTransactionData]
> > > ```
> > > After split:
> > > ```
> > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > > Users can delete whole of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > > ```
> > > Users can also delete only part of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > > ```
> > >
> > > ## Changes
> > >
> > > ### Protocol Changes
> > > New protobuf record to buffer requests.
> > >
> > > BatchedTransactionMetadataEntry
> > > ```
> > > message BatchedTransactionMetadataEntry{
> > >   // Array for buffer transaction log data.
> > >   repeated TransactionMetadataEntry transaction_log = 12;
> > > }
> > > ```
> > >
> > > BatchedPendingAckMetadataEntry
> > > ```
> > > message BatchedPendingAckMetadataEntry{
> > >   // Array for buffer pending ack data.
> > >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > > }
> > > ```
> > >
> > > Note: To ensure forward compatibility, we need to distinguish the old
> > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > > BatchedTransactionData data, and we add A magic number in front of the
> > > bytes that proto serializes:
> > > ```
> > > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > > ```
> > > Read Entry:
> > > ```
> > >                            /-- true -->
> [BatchedTransactionMetadataEntry]
> > > [Entry] --> has Magic Num ?
> > >                            \-- false --> [TransactionMetadataEntry]
> > > ```
> > >
> > >
> > > ### API Changes
> > >
> > > BatchAddDataCallback
> > > The Transaction Coordinator does not directly operate the Managed
> Ledger,
> > > uses the Transaction Log Store to operate on Managed Ledger. The
> Managed
> > > Ledger write API provides a callback class: AddEntryCallback, the same
> > > Transaction Log Store that provides bulk writes, provides a callback
> > class:
> > > BatchAddDataCallback. <a
> href="#BatchedAddDataCallbackExplains">Explains
> > > why do we need BatchAddDataCallback </a>.
> > >
> > > ![WechatIMG7](
> > >
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > > )
> > > Figure.BatchAddDataCallback in Write Flow
> > >
> > > ```java
> > > interface BatchAddDataCallback {
> > >     /**
> > >      * Successed callback function for “add data asynchronously”
> > >      *
> > >      * @param posotion A Position is a pointer to a specific entry into
> > the
> > > managed ledger.
> > >      * @param byteBuf The raw data which added.
> > >      * @param batchIndex Raw data count in The whole Batched Entry.
> > >      * @param batchSize The current raw data index in the batch.
> > >      * @param ctx opaque context
> > >      */
> > >     void addComplete(Position position, ByteBuf byteBuf, int
> batchIndex,
> > > int batchSize, Object context);
> > >     /**
> > >      * Failure callback function for “add data asynchronously”
> > >      *
> > >      * @param ctx opaque context
> > >      */
> > >     void addFailed(ManagedLedgerException exception, Object ctx);
> > > }
> > > ```
> > >
> > > ### Configuration Changes
> > > Add the Batch threshold parameters to control the refresh frequency.
> > >
> > > broker.conf
> > > ```
> > > transactionLogBatchedWriteEnabled = false;
> > > transactionLogBatchedWriteMaxRecords= 512;
> > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > transactionLogBatchedWriteMaxDelayInMillis= 1;
> > >
> > > pendingAckBatchedWriteEnabled = false;
> > > pendingAckBatchedWriteMaxRecords= 512;
> > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > > ```
> > >
> > > ### Compatibility
> > > After the batch feature is enabled, users can only downgrade to the
> > larger
> > > than “first version that supports BatchedTransactionMeta reading” to
> > > consume data. Data in a lower version broker cannot be parsed,
> resulting
> > in
> > > data loss. We also provide <a href="#librarySupports"> Library support
> > for
> > > Compatibility with older versions Broker</a>, If the user uses this
> > library
> > > on older version Broker<sup>[0]</sup>, all new data results can be
> > > processed correctly and none of the data will be lost.
> > >
> > > ----
> > > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> > >
> > > ### Observability
> > > When using the Batch feature, users will adjust the frequency of disk
> > > brushing to achieve the optimal performance. We provide two observable
> > > indicators for users' reference
> > >
> > > ```
> > > BatchedDataStoreMXBeanImpl {
> > >     /** The number of logs in each batch. **/
> > >     Rate batchRecordCount;
> > >     /** The size of each batch. **/
> > >     Rate batchSizeCount;
> > > }
> > > ```
> > >
> > > ## Test plan
> > > The test should cover the following cases:
> > >
> > > - The batch mechanism works abides by the total count, total size, and
> > max
> > > delay limitation.
> > > - The returned position for writing data is correct.
> > > - The managedCursor can delete and mark delete the
> > BatchedTransactionMeta.
> > > - Performance tests and compare before-after improvement.
> > >
> > > ## The appendix
> > >
> > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > > BatchAddDataCallback  </p>
> > > After all produced messages and acknowledgements to all partitions are
> > > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > > transaction status message to its transaction log, indicating that the
> > > transaction is complete (shown as 3.3a in the diagram). At this point,
> > all
> > > the messages pertaining to the transaction in its transaction log can
> > > safely be removed.
> > > e.g. There are two transactions:
> > > ![截屏2022-06-10 11 56 49](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > > )
> > > Transaction Log Write:
> > > ```
> > > transaction_1: start transaction
> > > transaction_2: start transaction
> > > transaction_1: add partition to tx
> > > transaction_1: add subscription to tx
> > > transaction_2: add partition to tx
> > > transaction_1: commit
> > > ```
> > > Bookie Write:
> > > ```
> > > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > > ```
> > > Bookie Response:
> > > ```
> > > {ledgerId=2, entryId=3}
> > > ```
> > > Transaction Log callback:
> > > ```
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> > ctx}
> > > ```
> > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> > to
> > > [0,2,3,5] and transaction_2 relations to [1,4].
> > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are
> not
> > > needed because the transaction has already been completed, but now we
> > could
> > > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > > transaction_2 is not finished and we still need them for the recovery
> > > operation.
> > > The BatchIndex and BatchSize can clearly indicate the location of each
> > > transaction log in the ledger. When the transaction log is no longer
> > used,
> > > Users can accurately delete it according to position and batchIndex.
> > >
> > > ### <p id="librarySupports">Library support for Compatibility with
> older
> > > versions Broker</p>
> > > In broker.conf we can configure the
> > > [transactionMetadataStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> > )
> > > to replace the implementation of TransactionLog, we can also configure
> > the
> > > [transactionPendingAckStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> > )
> > > to replace the implementation of PendingAckStore,
> > > We will provide a library containing the classes which can read batched
> > > transaction logs and pending ack logs:
> > >
> > > #### TransactionLogBatchEnryReadableImpl
> > > ```java
> > > public class TransactionLogBatchEnryReadableImpl extends
> > > MLTransactionLogImpl {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is transction log or batched transaction log.
> If
> > > the Entry is a
> > >     * transaction log, it maintains the same logic as the parent class.
> > If
> > > the transaction log
> > >     * is batched, it will be split into transaction log and processed
> > > according to the original
> > >     * logic
> > >      */
> > >     @Override
> > >     void replayAsync(TransactionLogReplayCallback
> > > transactionLogReplayCallback);
> > > }
> > > ```
> > >
> > > #### PendingAckStoreBatchEntryReadableImpl
> > > ```java
> > > public class PendingAckStoreBatchEntryReadableImpl extends
> > > MLPendingAckStore {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is pending ack log or batched pending ack log.
> > If
> > > the Entry is a
> > >     * pending ack log, it maintains the same logic as the parent class.
> > If
> > > the pending ack
> > >     * log is batched, it will be split into pending ack log and
> processed
> > > according to the
> > >     * original logic
> > >      */
> > >     void replayAsync(pendingAckHandle, executorService);
> > > }
> > > ```
> > >
> > > How to use this library
> > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > > set transactionPendingAckStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > > 3. Restart broker.
> >
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
Hi  Enrico

> I am not sure I understand the part of making it configurable via a
classname.
I believe it is better to simply have a flag "transactionEnableBatchWrites".
Otherwise the matrix of possible implementations will grow without limits.

Good idea, I've modified the design and added a switch in the Configure
Changes section. Could you take a look again.

On Fri, Jun 10, 2022 at 7:14 PM Enrico Olivelli <eo...@gmail.com> wrote:

> I have read the PIP, and overall I agree with the design.
> Good work !
>
> I am not sure I understand the part of making it configurable via a
> classname.
> I believe it is better to simply have a flag
> "transactionEnableBatchWrites".
> Otherwise the matrix of possible implementations will grow without limits.
>
> Enrico
>
> Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> <yu...@streamnative.io.invalid> ha scritto:
> >
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Batch writing ledger for transaction operation"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/15370
> >
> > ## Motivation
> >
> > Before reading the background, I suggest you read section “Transaction
> > Flow” of [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > )
> >
> > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> </p>
> > ![MG3](
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > )
> > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > - The gray square boxes represent logical components.
> > - All the blue boxes represent logs. The logs are usually Managed ledger
> > - Each arrow represents the request flow or message flow. These
> operations
> > occur in sequence indicated by the numbers next to each arrow.
> > - The black arrows indicate those shared by transaction and normal flow.
> > - The blue arrows represent normal-message-specific flow.
> > - The orange arrows represent transaction-message-specific flow.
> > - The sections below are numbered to match the operations showed in the
> > diagram(differ from [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ))
> >
> >
> > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> > [Acknowledgement State Machine](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> )
> > tells about the changes of the Acknowledge State and why we need
> persistent
> > “The Log which the Acknowledgement State is PENDING_ACK”.
> > #### 2.4a’ Mark messages is no longer useful with current subscription
> > Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> > #### 3.2b Mark messages is no longer useful with current subscription
> > The implementation here is exactly the same as 2.4a’, except that the
> > execution is triggered later, after the Transaction has been committed.
> >
> >
> > ### Analyze the performance cost of transaction
> > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> vs.
> > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> > remaining orange arrows are the additional performance overhead of all
> > transactions.
> > In terms of whether or not each transaction is executed multiple times,
> we
> > can split the flow into two classes(Optimizing a process that is executed
> > multiple times will yield more benefits):
> > - Executed once each transaction: flow-1.x and flow-3.x
> > - Executed multiple times each transaction: flow-2.x
> >
> > So optimizing the flow 2.x with a lot of execution is a good choice.
> Let's
> > split flow-2.x into two groups: those that cost more and those that cost
> > less:
> > - No disk written: flow-2.1 and fow-2.3
> > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> >
> > From the previous analysis, we found that optimizing flow-2.1a,
> flow-2.3a,
> > flow-2.4a would bring the most benefits, and batch writes would be an
> > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> > both manipulations written into the transaction log, we can combine them
> in
> > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > multiple 2.4a's into one batch for processing.
> > As we can see from “Transaction Flow” of [PIP-31: Transactional
> Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ),
> > these instructions are strictly sequential (guaranteed by the client):
> > - flow-1.x end before flow-2.x start
> > - flow-2.x end before flow-3.x start
> > - flow-3.1a end before flow-3.3a start
> >
> > Therefore, the broker does not need to worry about the dependency of
> these
> > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> Transaction
> > Log Batch too.
> >
> > ## Goal
> > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> accept
> > multiple write requests, buffer all those records, and persist to a
> single
> > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > - Allow users to specify control of the max size, max record of The
> Buffer.
> > - Allow users to specify control max delay time of The Write Request.
> > - Multiple raw data can be recovered from Batched Entry.
> > - Configurable “batched implementation” and “common implementation”
> switch.
> >
> > ## Approach
> > ### Buffer requests and write Bookie
> > Create a new protobuf record called “Batched Transaction Data” with an
> > array inside. When receive a request, we put it in the array.
> >
> > Request:
> > ```
> > [Request 1]{ data, callback }
> > [Request 2]]{ data, callback }
> > …
> > …
> > [Request N]]{ data, callback }
> > ```
> > Buffer:
> > ```
> > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > ```
> > Write Bookie:
> > ```
> > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > LedgerHandle callback: ledgerId=1, entryId=1
> > ```
> > Request-Callback:
> > ```
> > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > …
> > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> >
> > ### Delete BatchedTransactionMeta
> > [PIP 45](
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> )
> > has supported batch index delete. So the Raw Data added to a batch can be
> > with different batch indexes but with the same ledger ID and entry ID.
> >
> > Read:
> > ```
> > [BatchedTransactionData]
> > ```
> > After split:
> > ```
> > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> > Users can delete whole of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > ```
> > Users can also delete only part of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > ```
> >
> > ## Changes
> >
> > ### Protocol Changes
> > New protobuf record to buffer requests.
> >
> > BatchedTransactionMetadataEntry
> > ```
> > message BatchedTransactionMetadataEntry{
> >   // Array for buffer transaction log data.
> >   repeated TransactionMetadataEntry transaction_log = 12;
> > }
> > ```
> >
> > BatchedPendingAckMetadataEntry
> > ```
> > message BatchedPendingAckMetadataEntry{
> >   // Array for buffer pending ack data.
> >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > }
> > ```
> >
> > Note: To ensure forward compatibility, we need to distinguish the old
> > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > BatchedTransactionData data, and we add A magic number in front of the
> > bytes that proto serializes:
> > ```
> > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > ```
> > Read Entry:
> > ```
> >                            /-- true --> [BatchedTransactionMetadataEntry]
> > [Entry] --> has Magic Num ?
> >                            \-- false --> [TransactionMetadataEntry]
> > ```
> >
> >
> > ### API Changes
> >
> > BatchAddDataCallback
> > The Transaction Coordinator does not directly operate the Managed Ledger,
> > uses the Transaction Log Store to operate on Managed Ledger. The Managed
> > Ledger write API provides a callback class: AddEntryCallback, the same
> > Transaction Log Store that provides bulk writes, provides a callback
> class:
> > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> > why do we need BatchAddDataCallback </a>.
> >
> > ![WechatIMG7](
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > )
> > Figure.BatchAddDataCallback in Write Flow
> >
> > ```java
> > interface BatchAddDataCallback {
> >     /**
> >      * Successed callback function for “add data asynchronously”
> >      *
> >      * @param posotion A Position is a pointer to a specific entry into
> the
> > managed ledger.
> >      * @param byteBuf The raw data which added.
> >      * @param batchIndex Raw data count in The whole Batched Entry.
> >      * @param batchSize The current raw data index in the batch.
> >      * @param ctx opaque context
> >      */
> >     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> > int batchSize, Object context);
> >     /**
> >      * Failure callback function for “add data asynchronously”
> >      *
> >      * @param ctx opaque context
> >      */
> >     void addFailed(ManagedLedgerException exception, Object ctx);
> > }
> > ```
> >
> > ### Configuration Changes
> > Add the Batch threshold parameters to control the refresh frequency.
> >
> > broker.conf
> > ```
> > transactionLogBatchedWriteEnabled = false;
> > transactionLogBatchedWriteMaxRecords= 512;
> > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > transactionLogBatchedWriteMaxDelayInMillis= 1;
> >
> > pendingAckBatchedWriteEnabled = false;
> > pendingAckBatchedWriteMaxRecords= 512;
> > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > ```
> >
> > ### Compatibility
> > After the batch feature is enabled, users can only downgrade to the
> larger
> > than “first version that supports BatchedTransactionMeta reading” to
> > consume data. Data in a lower version broker cannot be parsed, resulting
> in
> > data loss. We also provide <a href="#librarySupports"> Library support
> for
> > Compatibility with older versions Broker</a>, If the user uses this
> library
> > on older version Broker<sup>[0]</sup>, all new data results can be
> > processed correctly and none of the data will be lost.
> >
> > ----
> > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> >
> > ### Observability
> > When using the Batch feature, users will adjust the frequency of disk
> > brushing to achieve the optimal performance. We provide two observable
> > indicators for users' reference
> >
> > ```
> > BatchedDataStoreMXBeanImpl {
> >     /** The number of logs in each batch. **/
> >     Rate batchRecordCount;
> >     /** The size of each batch. **/
> >     Rate batchSizeCount;
> > }
> > ```
> >
> > ## Test plan
> > The test should cover the following cases:
> >
> > - The batch mechanism works abides by the total count, total size, and
> max
> > delay limitation.
> > - The returned position for writing data is correct.
> > - The managedCursor can delete and mark delete the
> BatchedTransactionMeta.
> > - Performance tests and compare before-after improvement.
> >
> > ## The appendix
> >
> > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > BatchAddDataCallback  </p>
> > After all produced messages and acknowledgements to all partitions are
> > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > transaction status message to its transaction log, indicating that the
> > transaction is complete (shown as 3.3a in the diagram). At this point,
> all
> > the messages pertaining to the transaction in its transaction log can
> > safely be removed.
> > e.g. There are two transactions:
> > ![截屏2022-06-10 11 56 49](
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > )
> > Transaction Log Write:
> > ```
> > transaction_1: start transaction
> > transaction_2: start transaction
> > transaction_1: add partition to tx
> > transaction_1: add subscription to tx
> > transaction_2: add partition to tx
> > transaction_1: commit
> > ```
> > Bookie Write:
> > ```
> > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > ```
> > Bookie Response:
> > ```
> > {ledgerId=2, entryId=3}
> > ```
> > Transaction Log callback:
> > ```
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> ctx}
> > ```
> > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> to
> > [0,2,3,5] and transaction_2 relations to [1,4].
> > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> > needed because the transaction has already been completed, but now we
> could
> > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > transaction_2 is not finished and we still need them for the recovery
> > operation.
> > The BatchIndex and BatchSize can clearly indicate the location of each
> > transaction log in the ledger. When the transaction log is no longer
> used,
> > Users can accurately delete it according to position and batchIndex.
> >
> > ### <p id="librarySupports">Library support for Compatibility with older
> > versions Broker</p>
> > In broker.conf we can configure the
> > [transactionMetadataStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> )
> > to replace the implementation of TransactionLog, we can also configure
> the
> > [transactionPendingAckStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> )
> > to replace the implementation of PendingAckStore,
> > We will provide a library containing the classes which can read batched
> > transaction logs and pending ack logs:
> >
> > #### TransactionLogBatchEnryReadableImpl
> > ```java
> > public class TransactionLogBatchEnryReadableImpl extends
> > MLTransactionLogImpl {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is transction log or batched transaction log. If
> > the Entry is a
> >     * transaction log, it maintains the same logic as the parent class.
> If
> > the transaction log
> >     * is batched, it will be split into transaction log and processed
> > according to the original
> >     * logic
> >      */
> >     @Override
> >     void replayAsync(TransactionLogReplayCallback
> > transactionLogReplayCallback);
> > }
> > ```
> >
> > #### PendingAckStoreBatchEntryReadableImpl
> > ```java
> > public class PendingAckStoreBatchEntryReadableImpl extends
> > MLPendingAckStore {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is pending ack log or batched pending ack log.
> If
> > the Entry is a
> >     * pending ack log, it maintains the same logic as the parent class.
> If
> > the pending ack
> >     * log is batched, it will be split into pending ack log and processed
> > according to the
> >     * original logic
> >      */
> >     void replayAsync(pendingAckHandle, executorService);
> > }
> > ```
> >
> > How to use this library
> > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > set transactionPendingAckStoreProviderClassName is
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > 3. Restart broker.
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
Hi Pulsar community:

I've rewritten proposal: "Make transactions work more efficiently by
aggregation operation for transaction log and pending ack store"

Proposal Link: https://github.com/apache/pulsar/issues/15370

## Motivation

Transaction coordinator and Pending Ack Store are two core components in
Pulsar transactions. For how they work in Pulsar transactions, see [PIP-31:
Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
)

Both transaction coordinator and pending ack store use the managed ledger
to record data.The transaction coordinator records the transaction metadata
in a managed ledger(aka transaction log) to achieve transaction durability.
And the pending ack store records all the message ack operations to
guarantee the durability of message ack operations.

Currently, the transaction coordinator will append a new entry to the
transaction log for each transaction metadata/state change. For example,
for the life cycle of a transaction, both of the open transaction, add
produced partition to a transaction, add consumed subscription to a
transaction, and commit the transaction will append to the transaction log.
If there are many parallel transactions, the number of entries added to the
transaction log will also increase.

The pending ack store is similar to the transaction coordinator. Too many
parallel message acks will add more entries to the managed ledger used by
the pending ack store.

So this proposal tries to make the transaction work more efficiently by
adding aggregation operations for transaction log and pending ack store.

## <p id="goalPId">Goal</p>
Provide a mechanism allowing the Transaction Log Store and Pending Ack
Store to aggregate multiple records into a batched record and persist into
a single BK entry. This will make Pulsar transactions work more efficiently.

- Reduce the number of entries written to Bookies
- Reduce the entry indexes size of Bookie
- Reduce the number of entries read from the bookie

The new mechanism will provide the following ability to control the batch.

Adjustable thresholds: trigger BookKeeper-write when reaching any one of
the following conditions
- Max size (bytes)
- Max records count
- Max delay time
- Allow users to enable or disable batch-write.
- Dynamic enable or disable the feature.

## Approach
### Aggregate multiple records and write to Bookies
We will create a Container(aka TxLogBufferedWriter ) to buffer requests and
flush to Managed Ledger. Transaction Log Store And Pending Ack Store will
no longer write to Managed Ledger directly, Change to using
TxLogBufferedWriter to write Ledger data. The TxLogBufferedWriter caches
“write requests” for a certain number(or a certain size of request data)
and then writes them to the Managed Ledger in one go. After Managed Ledger
has written complete, The TxLogBufferedWriter responds to each
request-caller. In this process, Managed Ledger doesn't care how many
records(or what to be written) in the Entry, it just treats them as a
single block of data.

The first write-request by transaction components that write to
TxLogBufferedWriter will take a long time to receive a response because The
TxLogBufferedWriter must wait for subsequent requests to accumulate enough
data to actually start writing to the Managed Ledger. To control the
maximum latency, The TxLogBufferedWriter will mark the first request time
for each batch, and additional timing triggers writes.

The TxLogBufferedWriter does not guarantee that logs in the batch are all
from the same transaction. Instead, it does not matter which transaction
the request belongs to. One batch corresponds to one Managed Ledger
addEntryOp, so the Entry looks like the below:
![截屏2022-06-16 22 35 33](
https://user-images.githubusercontent.com/25195800/174094211-e2145887-c5fd-4cca-a4ab-a5842b92e0ef.png
)

When Transaction completes(committed or aborted), all Transaction log
records and Pending ack log records of this transaction will be
deleted(marked acknowledgment). To ensure that users can delete transaction
records accurately, So when callback to “write request”, The
TxLogBufferedWriter should tell “position(Entry position)”,  “how many
requests there are in a batch( aka batchSize )” and “the location of that
record inside the batch aka batchIndex)” to the caller.

After enabling this feature, some data is non-batch, and some data is batch
which is in Ledger. When batch-data is read, it needs to specifically parse
the data that Transaction Components can process. So we append a magic num
in front of the Batched Log Data to confirm that it is a batched data and
to ensure the future scalability, we will add a version identifier in front
of the original Batched Log Data, just like this:

```
// combine multi records using protobuf record (defined below)
[Record0, Record1, … RecordN ] ⇒ [Records]
// append magic num and version at front of Batched Log (outside protobuf
serialization)
// Magic Num 2 bytes
// Version 2 bytes
[Magic Num] [Version] [Records] ⇒ [Entry]
```

e.g,  Aggregate multiple records and write to Bookies

```
Request:
    → [Log 1]
    → [Log 2]
    → …
    → [Log N]
Aggregate to one:
   → [Log1, Log2 … LogN ] ⇒ [Batched Log]
   → [Magic Num] [version] [Batched Log] ⇒ [Entry]
 Managed Ledger  async write Entry:
   → async write [Entry]
   → callback: {ledgerId=1, entryId=1}
Request-Callback:
   → Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
   → Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
   → …
   → Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
```
—---------------------
Transaction Log Store Logs and Pending Ack Store logs are stored in
different ledgers, so they each hold their own TxLogBufferedWriter objects.

### Acknowledge entry with the aggregated records
In the previous section, we described how to merge multiple logs into a
single block and write them to the Bookie. We also described how
log-records from multiple transactions are mixed into the same Entry, but
each log-record deleted (acknowledged) at a different time (because each
transaction ends at a different time). It is possible that some
transactions have been completed for a long time, and some transactions are
still in progress. In this case, a mechanism is needed to mark each log in
the (now batched) Entry as invalid. When all records are invalid, the Entry
can be marked for deletion.

We can use an existing mechanism([PIP 45 supported batch index delete](
https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level))
to solve this problem. As described in the previous section, after writing
to the transaction log, the transaction component is told two properties to
indicate the specific location of the log in the Entry: batchIndex and
batchSize. Transaction Components could manage the mapping of
transaction-id and log-position after the data has been written(Yes, That's
what it's doing now at [MLTransactionLogImpl](
https://github.com/apache/pulsar/blob/f7635ec6d99bd5a13a31c7e9f17640746afec43c/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java#L230-L245),
 the difference is that the code does not manage the batchSize and
batchIndex. The [MLPendingAckStore](
https://github.com/apache/pulsar/blob/f7635ec6d99bd5a13a31c7e9f17640746afec43c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java#L250-L274)
uses a more subtle mechanism to maintain log-position). We can rely on
features provided by PIP-45 to identify which logs are ready to be deleted,
and ultimately to delete the whole Entry. When the topic reloads or the
broker crashes, Transaction Components can read logs from the ledger to
rebuild The Mapping, so we do not worry about state loss (in fact, that's
what it's doing now).

Note: TxLogBufferedWriter does not support reading and deleting data.
Transaction Components should control the data reading and deleting by
itself.

## Changes
### API Changes in [CmdTransactions](
https://github.com/apache/pulsar/blob/master/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
)
#### <p id="adminApi1">Enable/disable transaction coordinator batch log</p>
```shell
pulsar-admin transactions –set-txn-tc-batch-log –enable [boolean]
```
#### <p id="adminApi2">Get transaction coordinator batch log stat</p>
```shell
pulsar-admin transactions –get-txn-tc-batch-log-stat
```
Response like this:
```json
{
  "txnTcBatchLogEnabled": true,
  "coodinatorArray": [{
    "1": true
   },
  {
    "2": true
  }]
}
```
Note: If the broker has just restarted and no messages have been processed
using transaction, $.coodinatorArray will be empty.

#### <p id="adminApi3">Enable/disable transaction pending ack batch log</p>
```shell
pulsar-admin transactions –set-txn-pending-ack-batch-log –enable [boolean]
```
#### <p id="adminApi4">Get transaction pending ack batch log stat</p>
```shell
pulsar-admin transactions –get-txn-pending-ack-batch-log-stat
```
Response like this:
```json
{
  "txnPendingAckBatchLogEnabled": true,
  "subscriptionArray": [{
    "my-topic-my-subscription-1": true
   },
  {
    "my-topic-my-subscription-2": true
  }]
}
```

Note: If the broker has just restarted and no messages have been processed
using transaction, $.subscriptionArray will be empty.

### <p id="configPId">Configuration Changes</p>
#### [broker.conf](
https://github.com/apache/pulsar/blob/master/conf/broker.conf)
```properties
transactionLogBatchedWriteEnabled = false;
transactionLogBatchedWriteMaxRecords= 512;
transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionLogBatchedWriteMaxDelayInMillis= 1;

transactionPendingAckBatchedWriteEnabled = false;
transactionPendingAckBatchedWriteMaxRecords= 512;
transactionPendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionPendingAckBatchedWriteMaxDelayInMillis= 1;
```

### Protocol Changes
[PulsarTransactionMetadata.proto](
https://github.com/apache/pulsar/blob/master/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
)
New protobuf record to aggregate multi TransactionMetadataEntry records,
add to an existing file PulsarTransactionMetadata.proto. After
serialization, Magic Num and Version are prefixed with 2 bytes each, and
finally written to Bookie.

```proto
message BatchedTransactionMetadataEntry{
  // Array for buffer transaction log data.
  repeated TransactionMetadataEntry transaction_log = 1;
}
```

[TransactionPendingAck.proto](
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/proto/TransactionPendingAck.proto
)
New protobuf record to aggregate multi PendingAckMetadataEntry records, add
to an existing file TransactionPendingAck.proto. After serialization, Magic
Num and Version are prefixed with 2 bytes each, and finally written to
Bookie.

```proto
message BatchedPendingAckMetadataEntry{
  // Array for buffer pending ack data.
  repeated PendingAckMetadataEntry pending_ack_log=1;
}
```

### Metrics Changes

#### [Pulsar transaction](
https://pulsar.apache.org/docs/next/reference-metrics#pulsar-transaction)
When the batch feature is enabled, the <a
href="#configPId">Configuration</a> needs to be adjusted to optimize
performance. We need to provide many metrics to help users understand the
current state:
The percentage of the triggering actions of each threshold( The definition
of thresholds is explained in <a href="#goalPId" >Goal</a>
- How many logs are in each batch.
- Bytes size of logs per batched log.
- The time of the oldest record was spent in the buffer before being sent.

Here are the new metrics we want to add:
<meta charset="utf-8"><b style="font-weight:normal;"
id="docs-internal-guid-2b140645-7fff-ac51-948b-82b98ee25992"><div dir="ltr"
style="margin-left:0pt;" align="left">

Name | Type | Description | Label
-- | -- | -- | --
pulsar_txn_tc_log_records_count_per_entry | Histogram | count of records in
per transaction log batch.</br>Available bucket:</br>&nbsp; le="10" number
of  transaction logs per batch to be processed between (0, 10]</br>&nbsp;
le="50" number of  transaction logs per batch to be processed between (10,
50]</br>&nbsp; le="100" number of  transaction logs per batch to be
processed between (50, 100]</br>&nbsp; le="200" number of  transaction logs
per batch to be processed between (100, 200]</br>&nbsp; le="500" number of
transaction logs per batch to be processed between (100, 500]</br>&nbsp;
le="1000" number of  transaction logs per batch to be processed between
(500, 1000]</br>&nbsp; le="overflow" number of  transaction logs per batch
to be processed between (1000, ∞ ] | clustercoordinator_id
pulsar_txn_tc_batched_log_entry_size_bytes | Histogram | The added entry
size of a ledger with a given bucket.</br>&nbsp; Available
bucket:</br>&nbsp; le="0_128" is EntrySize between (0byte,
128byte]</br>&nbsp; le="128_512" is EntrySize between (128byte,
512byte]</br>&nbsp; le="512_1024" is EntrySize between (512byte,
1KB]</br>&nbsp; le="1024_2048" is EntrySize between (1KB, 2KB]</br>&nbsp;
le="2048_4096" is EntrySize between (2KB, 4KB]le="4096_16384" is EntrySize
between (4KB, 16KB]</br>&nbsp; le="16384_12400" is EntrySize between (16KB,
100KB]</br>&nbsp; le="12400_1232896" is EntrySize between (100KB,
1MB]</br>&nbsp; le="overflow" is EntrySize between (1MB, ∞ ] |
clustercoordinator_id
pulsar_txn_tc_batched_log_olderst_record_delay_time_seconds | Histogram |
The time of the oldest transaction log spent in the buffer before being
sent.</br>Available bucket:</br>&nbsp; le="1" time of the oldest
transaction log spent in the buffer before being sent between (0s,
0.001s]</br>&nbsp; le="5" time of the oldest transaction log spent in the
buffer before being sent between (0.001s, 0.005s]</br>&nbsp; le="10" time
of the oldest transaction log spent in the buffer before being sent between
(0.001s, 0.01s]</br>&nbsp; le="overflow" number of  transaction logs per
batch to be processed between (0.01s, ∞ ] | clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_records | Counter | The count
of the triggering transaction log batch flush actions by
${transactionLogBatchedWriteMaxRecords} | clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_size | Counter | The count of
the triggering transaction log batch flush actions by
${transactionLogBatchedWriteMaxSize} | clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_delay_time | Counter | The
count of the triggering transaction log batch flush actions by
${transactionLogBatchedWriteMaxDelayInMillis} | clustercoordinator_id
pulsar_pending_ack_batched_log_records_count_per_entry | Histogram | count
of records in per pending ack log batch.</br>Available bucket:</br>&nbsp;
le="10" number of  pending ack logs per batch to be processed between (0,
10]</br>&nbsp; le="50" number of  pending ack logs per batch to be
processed between (10, 50]</br>&nbsp; le="100" number of  pending ack logs
per batch to be processed between (50, 100]</br>&nbsp; le="200" number of
transaction logs per batch to be processed between (100, 200]</br>&nbsp;
le="500" number of  pending ack logs per batch to be processed between
(100, 500]</br>&nbsp; le="1000" number of  pending ack logs per batch to be
processed between (500, 1000]</br>&nbsp; le="overflow" number of  pending
ack logs per batch to be processed between (1000, ∞ ] | cluster
pulsar_pending_ack_batched_log_entry_size_bytes |   | The added entry size
of a ledger with a given bucket.</br>Available bucket:</br>&nbsp;
le="0_128" is EntrySize between (0byte, 128byte]</br>&nbsp; le="128_512" is
EntrySize between (128byte, 512byte]</br>&nbsp; le="512_1024" is EntrySize
between (512byte, 1KB]</br>&nbsp; le="1024_2048" is EntrySize between (1KB,
2KB]</br>&nbsp; le="2048_4096" is EntrySize between (2KB, 4KB]</br>&nbsp;
le="4096_16384" is EntrySize between (4KB, 16KB]</br>&nbsp;
le="16384_12400" is EntrySize between (16KB, 100KB]</br>&nbsp;
le="12400_1232896" is EntrySize between (100KB, 1MB]</br>&nbsp;
le="overflow" is EntrySize between (1MB, ∞ ] |
pulsar_pending_ack_batched_log_olderst_record_delay_time_seconds |
Histogram | The time of the oldest pending ack log spent in the buffer
before being sent.</br>Available bucket:</br>&nbsp; le="1" time of the
oldest pending ack log spent in the buffer before being sent between (0s,
0.001s]</br>&nbsp; le="5" time of the oldest pending ack log spent in the
buffer before being sent between (0.001s, 0.005s]</br>&nbsp; le="10" time
of the oldest pending ack log spent in the buffer before being sent between
(0.005s, 0.01s]</br>&nbsp; le="overflow" number of  pending ack logs per
batch to be processed between (0.005s, ∞ ] | cluster
pulsar_pending_ack_batched_log_triggering_count_by_records | Counter | The
count of the triggering pending ack log batch flush actions by
${transactionPendingAckBatchedWriteMaxRecords} | cluster
pulsar_pending_ack_batched_log_triggering_count_by_size | Counter | The
count of the triggering pending ack log batch flush actions by
${transactionPendingAckBatchedWriteMaxSize} | cluster
pulsar_pending_ack_batched_log_triggering_count_by_delay_time | Counter |
The count of the triggering pending ack log batch flush actions by
${transactionPendingAckBatchedWriteMaxDelayInMillis} | cluster

</div></b>

Label Description
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you have configured in the broker.conf file.
- coordinator_id: coordinator_id=${coordinator_id}. ${coordinator_id} is
the transaction coordinator id.

## Compatibility
This feature is not forward compatible because the new feature writes new
data structures to ledger that the old version of the broker could not
parse. To ensure that users can upgrade and downgrade broker versions
smoothly, we provide the following sections:

### Upgrade
After this feature is released, users can upgrade to the new version (since
this feature is turned off by default). Enabling this feature will write
new data to Bookie that the old version of the Broker cannot resolve, so do
not enable this feature if any nodes in the cluster have not been upgraded
to the new version. If you want to use this feature, the correct steps
would look like this:
Rolling upgrade broker nodes to the new version, keep batch-log feature
disabled.
1. Waiting for all broker nodes have been upgraded finish.
2. Rolling enable the batch-log feature of all brokers. see: <a
href="#adminApi1">Enable/disable transaction coordinator batch log</a> and
<a href="#adminApi3">Enable/disable transaction pending ack batch log</a>.
4. Waiting for all broker nodes have been enabled batch-log feature.
5. Ensure all transaction coordinators and all pending ack stores has been
enabled the feature. see: <a href="#adminApi2">Get transaction coordinator
batch log stat</a> and <a href="#adminApi4">Get transaction pending ack
batch log stat</a>

### Downgrade
After the batch feature is enabled,  *if users need to use an older version
of the broker, please strictly follow this step*(the old version of the
broker cannot recognize the new data with new format):
1. Disable the Batch Transaction Log feature for all broker nodes.
  - Disabled feature. see: <a href="#adminApi1">Enable/disable transaction
coordinator batch log</a> and <a href="#adminApi3">Enable/disable
transaction pending ack batch log</a>.
  -  Ensure all transaction coordinators and all pending ack stores has
been disabled the feature. see: <a href="#adminApi2">Get transaction
coordinator batch log stat</a> and <a href="#adminApi4">Get transaction
pending ack batch log stat</a>
2. Wait for all batch logs to be invalidated(this process will be quick,
depending on transaction Max timeout and managed ledger max rollover time).
*2-1. lookup coordinator id list*

```
 ./pulsar-admin topics partitioned-lookup
persistent://pulsar/system/transaction_coordinator_assign
```

Response like this:

```
persistent://pulsar/system/transaction_coordinator_assign-partition-0
 pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-1
 pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-2
 pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-3
 pulsar://127.0.0.1:6650
```

In this response, 0, 1, 2, and 3 are all coordinator ids

*2-2. View the ledger list of the current transaction meta log*

```
./pulsar-admin transactions coordinator-internal-stats --coordinator-id
[coordinator-id]
```

The properties of $.transactionLogStats.ledgers of Response like this:
```
    [{
        "ledgerId" : 48,
        "entries" : 10,
        "size" : 20,
        "offloaded" : false,
        "underReplicated" : false
      },{
        "ledgerId" : 49,
        "entries" : 10,
        "size" : 20,
        "offloaded" : false,
        "underReplicated" : false
      }]
```

*Note-1: The smallest ledgerId of ledgers means the oldest transaction log
ledger which has not been deleted (aka OldestExistsTransactionLogLedger)*

*Note-2: When query transaction stats immediately after disabled this
feature, The largest ledgerId of ledgers means the last batched transaction
log ledger (aka LastBatchedTransactionLogLedger)*

*2-3. Perform step 2-2 for all coordinator ids and remember all
LastBatchedTransactionLogLedger.*

*2-4. Find all subscription names using the transaction feature.*

List all topics in the specified namespace.

```
./pulsar-admin namespaces topics public/default
```

Response like this:

```
persistent://public/default/__change_events
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/my-topic
persistent://public/default/my-topic-my-subscription__transaction_pending_ack
```

The topic name ending with `_pending_ack` is actually created by the
subscription using transaction feature, this topic named by rule:

```
 {target topic name}-{subscription-name}__transaction_pending_ack
```

*2-5. View the ledger list of the current pending ack log*

```
./pulsar-admin transactions pending-ack-internal-stats --topic [topic]
--sub-name [subscription name]
```

The properties of $.pendingAckLogStats.ledgers of Response like this:

```
 [{
    "ledgerId" : 79,
    "entries" : 10,
    "size" : 20,
    "offloaded" : false,
    "underReplicated" : false
  },{
    "ledgerId" : 80,
    "entries" : 10,
    "size" : 20,
    "offloaded" : false,
    "underReplicated" : false
  }]
```

*Note-1: The smallest ledgerId of ledgers means the oldest pending ack log
ledger which has not been deleted (aka OldestExistsPendingAckLogLedger)*

*Note-2: When query transaction stats immediately after disabled this
feature, The largest ledgerId of ledgers means the last batched transaction
log ledger (aka LastBatchedPendingAckLogLedger)*

*2-6. Perform step 2-5 for all subscriptions which use transactions feature
and remember all LastBatchedPendingAckLogLedger.*

*2-7. Wait until OldestExistsTransactionLogLedger is greater than
LastBatchedTransactionLogLedger already remember, and Wait until
OldestExistsPendingAckLogLedger is greater than
LastBatchedPendingAckLogLedger already remember. That means all batch logs
to be invalidated.*

3. Downgrade all broker nodes to the previous version.

## Test plan
The test should cover the following cases:

- Aggregate multiple records and write to Bookies is correct, and the
returned position, batchSize, batchIndex for writing data is correct.
- The batch mechanism works abides by the total count, total size, and max
delay limitation.
- Delete Batched Transaction Log is correct after transaction end.
- Metrics data is correct.
- Performance tests and compare before-after improvement.

Thanks
Yubiao.Feng

On Fri, Jun 10, 2022 at 9:19 PM Xiangying Meng <xi...@apache.org> wrote:

> I think this is a nice optimization of transaction internal components.
> But I have two little questions:
> 1. I noticed you write conf transactionMetadataStoreProviderClassName is
> configured in broker.conf. Should this be added to `Configuration Changes`?
> 2. This proposal will add a batch mechanism for pending ack, but this
> proposal is no detailed description for pending ack similar to the
> description of metadata log batch.
>
> On Fri, Jun 10, 2022 at 7:15 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > I have read the PIP, and overall I agree with the design.
> > Good work !
> >
> > I am not sure I understand the part of making it configurable via a
> > classname.
> > I believe it is better to simply have a flag
> > "transactionEnableBatchWrites".
> > Otherwise the matrix of possible implementations will grow without
> limits.
> >
> > Enrico
> >
> > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> > <yu...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Batch writing ledger for transaction
> operation"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/15370
> > >
> > > ## Motivation
> > >
> > > Before reading the background, I suggest you read section “Transaction
> > > Flow” of [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > )
> > >
> > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> > </p>
> > > ![MG3](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > > )
> > > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > > - The gray square boxes represent logical components.
> > > - All the blue boxes represent logs. The logs are usually Managed
> ledger
> > > - Each arrow represents the request flow or message flow. These
> > operations
> > > occur in sequence indicated by the numbers next to each arrow.
> > > - The black arrows indicate those shared by transaction and normal
> flow.
> > > - The blue arrows represent normal-message-specific flow.
> > > - The orange arrows represent transaction-message-specific flow.
> > > - The sections below are numbered to match the operations showed in the
> > > diagram(differ from [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > ))
> > >
> > >
> > > #### 2.4a Write logs to ledger which Acknowledgement State is
> PENDING_ACK
> > > [Acknowledgement State Machine](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> > )
> > > tells about the changes of the Acknowledge State and why we need
> > persistent
> > > “The Log which the Acknowledgement State is PENDING_ACK”.
> > > #### 2.4a’ Mark messages is no longer useful with current subscription
> > > Update `Cursor` to mark the messages as DELETED. So they can be
> deleted.
> > > #### 3.2b Mark messages is no longer useful with current subscription
> > > The implementation here is exactly the same as 2.4a’, except that the
> > > execution is triggered later, after the Transaction has been committed.
> > >
> > >
> > > ### Analyze the performance cost of transaction
> > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> > vs.
> > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so
> the
> > > remaining orange arrows are the additional performance overhead of all
> > > transactions.
> > > In terms of whether or not each transaction is executed multiple times,
> > we
> > > can split the flow into two classes(Optimizing a process that is
> executed
> > > multiple times will yield more benefits):
> > > - Executed once each transaction: flow-1.x and flow-3.x
> > > - Executed multiple times each transaction: flow-2.x
> > >
> > > So optimizing the flow 2.x with a lot of execution is a good choice.
> > Let's
> > > split flow-2.x into two groups: those that cost more and those that
> cost
> > > less:
> > > - No disk written: flow-2.1 and fow-2.3
> > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> > >
> > > From the previous analysis, we found that optimizing flow-2.1a,
> > flow-2.3a,
> > > flow-2.4a would bring the most benefits, and batch writes would be an
> > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a
> are
> > > both manipulations written into the transaction log, we can combine
> them
> > in
> > > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > > multiple 2.4a's into one batch for processing.
> > > As we can see from “Transaction Flow” of [PIP-31: Transactional
> > Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ),
> > > these instructions are strictly sequential (guaranteed by the client):
> > > - flow-1.x end before flow-2.x start
> > > - flow-2.x end before flow-3.x start
> > > - flow-3.1a end before flow-3.3a start
> > >
> > > Therefore, the broker does not need to worry about the dependency of
> > these
> > > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> > Transaction
> > > Log Batch too.
> > >
> > > ## Goal
> > > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> > accept
> > > multiple write requests, buffer all those records, and persist to a
> > single
> > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > > - Allow users to specify control of the max size, max record of The
> > Buffer.
> > > - Allow users to specify control max delay time of The Write Request.
> > > - Multiple raw data can be recovered from Batched Entry.
> > > - Configurable “batched implementation” and “common implementation”
> > switch.
> > >
> > > ## Approach
> > > ### Buffer requests and write Bookie
> > > Create a new protobuf record called “Batched Transaction Data” with an
> > > array inside. When receive a request, we put it in the array.
> > >
> > > Request:
> > > ```
> > > [Request 1]{ data, callback }
> > > [Request 2]]{ data, callback }
> > > …
> > > …
> > > [Request N]]{ data, callback }
> > > ```
> > > Buffer:
> > > ```
> > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > > ```
> > > Write Bookie:
> > > ```
> > > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > > LedgerHandle callback: ledgerId=1, entryId=1
> > > ```
> > > Request-Callback:
> > > ```
> > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > …
> > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > >
> > > ### Delete BatchedTransactionMeta
> > > [PIP 45](
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> > )
> > > has supported batch index delete. So the Raw Data added to a batch can
> be
> > > with different batch indexes but with the same ledger ID and entry ID.
> > >
> > > Read:
> > > ```
> > > [BatchedTransactionData]
> > > ```
> > > After split:
> > > ```
> > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > > Users can delete whole of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > > ```
> > > Users can also delete only part of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > > ```
> > >
> > > ## Changes
> > >
> > > ### Protocol Changes
> > > New protobuf record to buffer requests.
> > >
> > > BatchedTransactionMetadataEntry
> > > ```
> > > message BatchedTransactionMetadataEntry{
> > >   // Array for buffer transaction log data.
> > >   repeated TransactionMetadataEntry transaction_log = 12;
> > > }
> > > ```
> > >
> > > BatchedPendingAckMetadataEntry
> > > ```
> > > message BatchedPendingAckMetadataEntry{
> > >   // Array for buffer pending ack data.
> > >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > > }
> > > ```
> > >
> > > Note: To ensure forward compatibility, we need to distinguish the old
> > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > > BatchedTransactionData data, and we add A magic number in front of the
> > > bytes that proto serializes:
> > > ```
> > > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > > ```
> > > Read Entry:
> > > ```
> > >                            /-- true -->
> [BatchedTransactionMetadataEntry]
> > > [Entry] --> has Magic Num ?
> > >                            \-- false --> [TransactionMetadataEntry]
> > > ```
> > >
> > >
> > > ### API Changes
> > >
> > > BatchAddDataCallback
> > > The Transaction Coordinator does not directly operate the Managed
> Ledger,
> > > uses the Transaction Log Store to operate on Managed Ledger. The
> Managed
> > > Ledger write API provides a callback class: AddEntryCallback, the same
> > > Transaction Log Store that provides bulk writes, provides a callback
> > class:
> > > BatchAddDataCallback. <a
> href="#BatchedAddDataCallbackExplains">Explains
> > > why do we need BatchAddDataCallback </a>.
> > >
> > > ![WechatIMG7](
> > >
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > > )
> > > Figure.BatchAddDataCallback in Write Flow
> > >
> > > ```java
> > > interface BatchAddDataCallback {
> > >     /**
> > >      * Successed callback function for “add data asynchronously”
> > >      *
> > >      * @param posotion A Position is a pointer to a specific entry into
> > the
> > > managed ledger.
> > >      * @param byteBuf The raw data which added.
> > >      * @param batchIndex Raw data count in The whole Batched Entry.
> > >      * @param batchSize The current raw data index in the batch.
> > >      * @param ctx opaque context
> > >      */
> > >     void addComplete(Position position, ByteBuf byteBuf, int
> batchIndex,
> > > int batchSize, Object context);
> > >     /**
> > >      * Failure callback function for “add data asynchronously”
> > >      *
> > >      * @param ctx opaque context
> > >      */
> > >     void addFailed(ManagedLedgerException exception, Object ctx);
> > > }
> > > ```
> > >
> > > ### Configuration Changes
> > > Add the Batch threshold parameters to control the refresh frequency.
> > >
> > > broker.conf
> > > ```
> > > transactionLogBatchedWriteEnabled = false;
> > > transactionLogBatchedWriteMaxRecords= 512;
> > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > transactionLogBatchedWriteMaxDelayInMillis= 1;
> > >
> > > pendingAckBatchedWriteEnabled = false;
> > > pendingAckBatchedWriteMaxRecords= 512;
> > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > > ```
> > >
> > > ### Compatibility
> > > After the batch feature is enabled, users can only downgrade to the
> > larger
> > > than “first version that supports BatchedTransactionMeta reading” to
> > > consume data. Data in a lower version broker cannot be parsed,
> resulting
> > in
> > > data loss. We also provide <a href="#librarySupports"> Library support
> > for
> > > Compatibility with older versions Broker</a>, If the user uses this
> > library
> > > on older version Broker<sup>[0]</sup>, all new data results can be
> > > processed correctly and none of the data will be lost.
> > >
> > > ----
> > > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> > >
> > > ### Observability
> > > When using the Batch feature, users will adjust the frequency of disk
> > > brushing to achieve the optimal performance. We provide two observable
> > > indicators for users' reference
> > >
> > > ```
> > > BatchedDataStoreMXBeanImpl {
> > >     /** The number of logs in each batch. **/
> > >     Rate batchRecordCount;
> > >     /** The size of each batch. **/
> > >     Rate batchSizeCount;
> > > }
> > > ```
> > >
> > > ## Test plan
> > > The test should cover the following cases:
> > >
> > > - The batch mechanism works abides by the total count, total size, and
> > max
> > > delay limitation.
> > > - The returned position for writing data is correct.
> > > - The managedCursor can delete and mark delete the
> > BatchedTransactionMeta.
> > > - Performance tests and compare before-after improvement.
> > >
> > > ## The appendix
> > >
> > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > > BatchAddDataCallback  </p>
> > > After all produced messages and acknowledgements to all partitions are
> > > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > > transaction status message to its transaction log, indicating that the
> > > transaction is complete (shown as 3.3a in the diagram). At this point,
> > all
> > > the messages pertaining to the transaction in its transaction log can
> > > safely be removed.
> > > e.g. There are two transactions:
> > > ![截屏2022-06-10 11 56 49](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > > )
> > > Transaction Log Write:
> > > ```
> > > transaction_1: start transaction
> > > transaction_2: start transaction
> > > transaction_1: add partition to tx
> > > transaction_1: add subscription to tx
> > > transaction_2: add partition to tx
> > > transaction_1: commit
> > > ```
> > > Bookie Write:
> > > ```
> > > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > > ```
> > > Bookie Response:
> > > ```
> > > {ledgerId=2, entryId=3}
> > > ```
> > > Transaction Log callback:
> > > ```
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> > ctx}
> > > ```
> > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> > to
> > > [0,2,3,5] and transaction_2 relations to [1,4].
> > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are
> not
> > > needed because the transaction has already been completed, but now we
> > could
> > > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > > transaction_2 is not finished and we still need them for the recovery
> > > operation.
> > > The BatchIndex and BatchSize can clearly indicate the location of each
> > > transaction log in the ledger. When the transaction log is no longer
> > used,
> > > Users can accurately delete it according to position and batchIndex.
> > >
> > > ### <p id="librarySupports">Library support for Compatibility with
> older
> > > versions Broker</p>
> > > In broker.conf we can configure the
> > > [transactionMetadataStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> > )
> > > to replace the implementation of TransactionLog, we can also configure
> > the
> > > [transactionPendingAckStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> > )
> > > to replace the implementation of PendingAckStore,
> > > We will provide a library containing the classes which can read batched
> > > transaction logs and pending ack logs:
> > >
> > > #### TransactionLogBatchEnryReadableImpl
> > > ```java
> > > public class TransactionLogBatchEnryReadableImpl extends
> > > MLTransactionLogImpl {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is transction log or batched transaction log.
> If
> > > the Entry is a
> > >     * transaction log, it maintains the same logic as the parent class.
> > If
> > > the transaction log
> > >     * is batched, it will be split into transaction log and processed
> > > according to the original
> > >     * logic
> > >      */
> > >     @Override
> > >     void replayAsync(TransactionLogReplayCallback
> > > transactionLogReplayCallback);
> > > }
> > > ```
> > >
> > > #### PendingAckStoreBatchEntryReadableImpl
> > > ```java
> > > public class PendingAckStoreBatchEntryReadableImpl extends
> > > MLPendingAckStore {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is pending ack log or batched pending ack log.
> > If
> > > the Entry is a
> > >     * pending ack log, it maintains the same logic as the parent class.
> > If
> > > the pending ack
> > >     * log is batched, it will be split into pending ack log and
> processed
> > > according to the
> > >     * original logic
> > >      */
> > >     void replayAsync(pendingAckHandle, executorService);
> > > }
> > > ```
> > >
> > > How to use this library
> > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > > set transactionPendingAckStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > > 3. Restart broker.
> >
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
Hi XiangYing
>  I noticed you write conf transactionMetadataStoreProviderClassName is
configured in broker.conf. Should this be added to `Configuration Changes`?

I have rewritten the design, in the new design I removed the section which
has "transactionMetadataStoreProviderClassName", instead it, I write
new sections: "Upgrade" and "Downgrade".

> This proposal will add a batch mechanism for pending ack, but this
proposal is no detailed description for pending ack similar to the
description of metadata log batch.

In the new design, I have appended "detail of pending ack" at section
"Acknowledge entry with the aggregated records".


On Fri, Jun 10, 2022 at 9:19 PM Xiangying Meng <xi...@apache.org> wrote:

> I think this is a nice optimization of transaction internal components.
> But I have two little questions:
> 1. I noticed you write conf transactionMetadataStoreProviderClassName is
> configured in broker.conf. Should this be added to `Configuration Changes`?
> 2. This proposal will add a batch mechanism for pending ack, but this
> proposal is no detailed description for pending ack similar to the
> description of metadata log batch.
>
> On Fri, Jun 10, 2022 at 7:15 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > I have read the PIP, and overall I agree with the design.
> > Good work !
> >
> > I am not sure I understand the part of making it configurable via a
> > classname.
> > I believe it is better to simply have a flag
> > "transactionEnableBatchWrites".
> > Otherwise the matrix of possible implementations will grow without
> limits.
> >
> > Enrico
> >
> > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> > <yu...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Batch writing ledger for transaction
> operation"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/15370
> > >
> > > ## Motivation
> > >
> > > Before reading the background, I suggest you read section “Transaction
> > > Flow” of [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > )
> > >
> > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> > </p>
> > > ![MG3](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > > )
> > > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > > - The gray square boxes represent logical components.
> > > - All the blue boxes represent logs. The logs are usually Managed
> ledger
> > > - Each arrow represents the request flow or message flow. These
> > operations
> > > occur in sequence indicated by the numbers next to each arrow.
> > > - The black arrows indicate those shared by transaction and normal
> flow.
> > > - The blue arrows represent normal-message-specific flow.
> > > - The orange arrows represent transaction-message-specific flow.
> > > - The sections below are numbered to match the operations showed in the
> > > diagram(differ from [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > ))
> > >
> > >
> > > #### 2.4a Write logs to ledger which Acknowledgement State is
> PENDING_ACK
> > > [Acknowledgement State Machine](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> > )
> > > tells about the changes of the Acknowledge State and why we need
> > persistent
> > > “The Log which the Acknowledgement State is PENDING_ACK”.
> > > #### 2.4a’ Mark messages is no longer useful with current subscription
> > > Update `Cursor` to mark the messages as DELETED. So they can be
> deleted.
> > > #### 3.2b Mark messages is no longer useful with current subscription
> > > The implementation here is exactly the same as 2.4a’, except that the
> > > execution is triggered later, after the Transaction has been committed.
> > >
> > >
> > > ### Analyze the performance cost of transaction
> > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> > vs.
> > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so
> the
> > > remaining orange arrows are the additional performance overhead of all
> > > transactions.
> > > In terms of whether or not each transaction is executed multiple times,
> > we
> > > can split the flow into two classes(Optimizing a process that is
> executed
> > > multiple times will yield more benefits):
> > > - Executed once each transaction: flow-1.x and flow-3.x
> > > - Executed multiple times each transaction: flow-2.x
> > >
> > > So optimizing the flow 2.x with a lot of execution is a good choice.
> > Let's
> > > split flow-2.x into two groups: those that cost more and those that
> cost
> > > less:
> > > - No disk written: flow-2.1 and fow-2.3
> > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> > >
> > > From the previous analysis, we found that optimizing flow-2.1a,
> > flow-2.3a,
> > > flow-2.4a would bring the most benefits, and batch writes would be an
> > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a
> are
> > > both manipulations written into the transaction log, we can combine
> them
> > in
> > > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > > multiple 2.4a's into one batch for processing.
> > > As we can see from “Transaction Flow” of [PIP-31: Transactional
> > Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ),
> > > these instructions are strictly sequential (guaranteed by the client):
> > > - flow-1.x end before flow-2.x start
> > > - flow-2.x end before flow-3.x start
> > > - flow-3.1a end before flow-3.3a start
> > >
> > > Therefore, the broker does not need to worry about the dependency of
> > these
> > > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> > Transaction
> > > Log Batch too.
> > >
> > > ## Goal
> > > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> > accept
> > > multiple write requests, buffer all those records, and persist to a
> > single
> > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > > - Allow users to specify control of the max size, max record of The
> > Buffer.
> > > - Allow users to specify control max delay time of The Write Request.
> > > - Multiple raw data can be recovered from Batched Entry.
> > > - Configurable “batched implementation” and “common implementation”
> > switch.
> > >
> > > ## Approach
> > > ### Buffer requests and write Bookie
> > > Create a new protobuf record called “Batched Transaction Data” with an
> > > array inside. When receive a request, we put it in the array.
> > >
> > > Request:
> > > ```
> > > [Request 1]{ data, callback }
> > > [Request 2]]{ data, callback }
> > > …
> > > …
> > > [Request N]]{ data, callback }
> > > ```
> > > Buffer:
> > > ```
> > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > > ```
> > > Write Bookie:
> > > ```
> > > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > > LedgerHandle callback: ledgerId=1, entryId=1
> > > ```
> > > Request-Callback:
> > > ```
> > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > …
> > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > >
> > > ### Delete BatchedTransactionMeta
> > > [PIP 45](
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> > )
> > > has supported batch index delete. So the Raw Data added to a batch can
> be
> > > with different batch indexes but with the same ledger ID and entry ID.
> > >
> > > Read:
> > > ```
> > > [BatchedTransactionData]
> > > ```
> > > After split:
> > > ```
> > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > > Users can delete whole of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > > ```
> > > Users can also delete only part of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > > ```
> > >
> > > ## Changes
> > >
> > > ### Protocol Changes
> > > New protobuf record to buffer requests.
> > >
> > > BatchedTransactionMetadataEntry
> > > ```
> > > message BatchedTransactionMetadataEntry{
> > >   // Array for buffer transaction log data.
> > >   repeated TransactionMetadataEntry transaction_log = 12;
> > > }
> > > ```
> > >
> > > BatchedPendingAckMetadataEntry
> > > ```
> > > message BatchedPendingAckMetadataEntry{
> > >   // Array for buffer pending ack data.
> > >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > > }
> > > ```
> > >
> > > Note: To ensure forward compatibility, we need to distinguish the old
> > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > > BatchedTransactionData data, and we add A magic number in front of the
> > > bytes that proto serializes:
> > > ```
> > > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > > ```
> > > Read Entry:
> > > ```
> > >                            /-- true -->
> [BatchedTransactionMetadataEntry]
> > > [Entry] --> has Magic Num ?
> > >                            \-- false --> [TransactionMetadataEntry]
> > > ```
> > >
> > >
> > > ### API Changes
> > >
> > > BatchAddDataCallback
> > > The Transaction Coordinator does not directly operate the Managed
> Ledger,
> > > uses the Transaction Log Store to operate on Managed Ledger. The
> Managed
> > > Ledger write API provides a callback class: AddEntryCallback, the same
> > > Transaction Log Store that provides bulk writes, provides a callback
> > class:
> > > BatchAddDataCallback. <a
> href="#BatchedAddDataCallbackExplains">Explains
> > > why do we need BatchAddDataCallback </a>.
> > >
> > > ![WechatIMG7](
> > >
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > > )
> > > Figure.BatchAddDataCallback in Write Flow
> > >
> > > ```java
> > > interface BatchAddDataCallback {
> > >     /**
> > >      * Successed callback function for “add data asynchronously”
> > >      *
> > >      * @param posotion A Position is a pointer to a specific entry into
> > the
> > > managed ledger.
> > >      * @param byteBuf The raw data which added.
> > >      * @param batchIndex Raw data count in The whole Batched Entry.
> > >      * @param batchSize The current raw data index in the batch.
> > >      * @param ctx opaque context
> > >      */
> > >     void addComplete(Position position, ByteBuf byteBuf, int
> batchIndex,
> > > int batchSize, Object context);
> > >     /**
> > >      * Failure callback function for “add data asynchronously”
> > >      *
> > >      * @param ctx opaque context
> > >      */
> > >     void addFailed(ManagedLedgerException exception, Object ctx);
> > > }
> > > ```
> > >
> > > ### Configuration Changes
> > > Add the Batch threshold parameters to control the refresh frequency.
> > >
> > > broker.conf
> > > ```
> > > transactionLogBatchedWriteEnabled = false;
> > > transactionLogBatchedWriteMaxRecords= 512;
> > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > transactionLogBatchedWriteMaxDelayInMillis= 1;
> > >
> > > pendingAckBatchedWriteEnabled = false;
> > > pendingAckBatchedWriteMaxRecords= 512;
> > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > > ```
> > >
> > > ### Compatibility
> > > After the batch feature is enabled, users can only downgrade to the
> > larger
> > > than “first version that supports BatchedTransactionMeta reading” to
> > > consume data. Data in a lower version broker cannot be parsed,
> resulting
> > in
> > > data loss. We also provide <a href="#librarySupports"> Library support
> > for
> > > Compatibility with older versions Broker</a>, If the user uses this
> > library
> > > on older version Broker<sup>[0]</sup>, all new data results can be
> > > processed correctly and none of the data will be lost.
> > >
> > > ----
> > > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> > >
> > > ### Observability
> > > When using the Batch feature, users will adjust the frequency of disk
> > > brushing to achieve the optimal performance. We provide two observable
> > > indicators for users' reference
> > >
> > > ```
> > > BatchedDataStoreMXBeanImpl {
> > >     /** The number of logs in each batch. **/
> > >     Rate batchRecordCount;
> > >     /** The size of each batch. **/
> > >     Rate batchSizeCount;
> > > }
> > > ```
> > >
> > > ## Test plan
> > > The test should cover the following cases:
> > >
> > > - The batch mechanism works abides by the total count, total size, and
> > max
> > > delay limitation.
> > > - The returned position for writing data is correct.
> > > - The managedCursor can delete and mark delete the
> > BatchedTransactionMeta.
> > > - Performance tests and compare before-after improvement.
> > >
> > > ## The appendix
> > >
> > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > > BatchAddDataCallback  </p>
> > > After all produced messages and acknowledgements to all partitions are
> > > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > > transaction status message to its transaction log, indicating that the
> > > transaction is complete (shown as 3.3a in the diagram). At this point,
> > all
> > > the messages pertaining to the transaction in its transaction log can
> > > safely be removed.
> > > e.g. There are two transactions:
> > > ![截屏2022-06-10 11 56 49](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > > )
> > > Transaction Log Write:
> > > ```
> > > transaction_1: start transaction
> > > transaction_2: start transaction
> > > transaction_1: add partition to tx
> > > transaction_1: add subscription to tx
> > > transaction_2: add partition to tx
> > > transaction_1: commit
> > > ```
> > > Bookie Write:
> > > ```
> > > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > > ```
> > > Bookie Response:
> > > ```
> > > {ledgerId=2, entryId=3}
> > > ```
> > > Transaction Log callback:
> > > ```
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> > ctx}
> > > ```
> > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> > to
> > > [0,2,3,5] and transaction_2 relations to [1,4].
> > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are
> not
> > > needed because the transaction has already been completed, but now we
> > could
> > > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > > transaction_2 is not finished and we still need them for the recovery
> > > operation.
> > > The BatchIndex and BatchSize can clearly indicate the location of each
> > > transaction log in the ledger. When the transaction log is no longer
> > used,
> > > Users can accurately delete it according to position and batchIndex.
> > >
> > > ### <p id="librarySupports">Library support for Compatibility with
> older
> > > versions Broker</p>
> > > In broker.conf we can configure the
> > > [transactionMetadataStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> > )
> > > to replace the implementation of TransactionLog, we can also configure
> > the
> > > [transactionPendingAckStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> > )
> > > to replace the implementation of PendingAckStore,
> > > We will provide a library containing the classes which can read batched
> > > transaction logs and pending ack logs:
> > >
> > > #### TransactionLogBatchEnryReadableImpl
> > > ```java
> > > public class TransactionLogBatchEnryReadableImpl extends
> > > MLTransactionLogImpl {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is transction log or batched transaction log.
> If
> > > the Entry is a
> > >     * transaction log, it maintains the same logic as the parent class.
> > If
> > > the transaction log
> > >     * is batched, it will be split into transaction log and processed
> > > according to the original
> > >     * logic
> > >      */
> > >     @Override
> > >     void replayAsync(TransactionLogReplayCallback
> > > transactionLogReplayCallback);
> > > }
> > > ```
> > >
> > > #### PendingAckStoreBatchEntryReadableImpl
> > > ```java
> > > public class PendingAckStoreBatchEntryReadableImpl extends
> > > MLPendingAckStore {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is pending ack log or batched pending ack log.
> > If
> > > the Entry is a
> > >     * pending ack log, it maintains the same logic as the parent class.
> > If
> > > the pending ack
> > >     * log is batched, it will be split into pending ack log and
> processed
> > > according to the
> > >     * original logic
> > >      */
> > >     void replayAsync(pendingAckHandle, executorService);
> > > }
> > > ```
> > >
> > > How to use this library
> > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > > set transactionPendingAckStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > > 3. Restart broker.
> >
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Xiangying Meng <xi...@apache.org>.
I think this is a nice optimization of transaction internal components.
But I have two little questions:
1. I noticed you write conf transactionMetadataStoreProviderClassName is
configured in broker.conf. Should this be added to `Configuration Changes`?
2. This proposal will add a batch mechanism for pending ack, but this
proposal is no detailed description for pending ack similar to the
description of metadata log batch.

On Fri, Jun 10, 2022 at 7:15 PM Enrico Olivelli <eo...@gmail.com> wrote:

> I have read the PIP, and overall I agree with the design.
> Good work !
>
> I am not sure I understand the part of making it configurable via a
> classname.
> I believe it is better to simply have a flag
> "transactionEnableBatchWrites".
> Otherwise the matrix of possible implementations will grow without limits.
>
> Enrico
>
> Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> <yu...@streamnative.io.invalid> ha scritto:
> >
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Batch writing ledger for transaction operation"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/15370
> >
> > ## Motivation
> >
> > Before reading the background, I suggest you read section “Transaction
> > Flow” of [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > )
> >
> > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> </p>
> > ![MG3](
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > )
> > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > - The gray square boxes represent logical components.
> > - All the blue boxes represent logs. The logs are usually Managed ledger
> > - Each arrow represents the request flow or message flow. These
> operations
> > occur in sequence indicated by the numbers next to each arrow.
> > - The black arrows indicate those shared by transaction and normal flow.
> > - The blue arrows represent normal-message-specific flow.
> > - The orange arrows represent transaction-message-specific flow.
> > - The sections below are numbered to match the operations showed in the
> > diagram(differ from [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ))
> >
> >
> > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> > [Acknowledgement State Machine](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> )
> > tells about the changes of the Acknowledge State and why we need
> persistent
> > “The Log which the Acknowledgement State is PENDING_ACK”.
> > #### 2.4a’ Mark messages is no longer useful with current subscription
> > Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> > #### 3.2b Mark messages is no longer useful with current subscription
> > The implementation here is exactly the same as 2.4a’, except that the
> > execution is triggered later, after the Transaction has been committed.
> >
> >
> > ### Analyze the performance cost of transaction
> > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> vs.
> > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> > remaining orange arrows are the additional performance overhead of all
> > transactions.
> > In terms of whether or not each transaction is executed multiple times,
> we
> > can split the flow into two classes(Optimizing a process that is executed
> > multiple times will yield more benefits):
> > - Executed once each transaction: flow-1.x and flow-3.x
> > - Executed multiple times each transaction: flow-2.x
> >
> > So optimizing the flow 2.x with a lot of execution is a good choice.
> Let's
> > split flow-2.x into two groups: those that cost more and those that cost
> > less:
> > - No disk written: flow-2.1 and fow-2.3
> > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> >
> > From the previous analysis, we found that optimizing flow-2.1a,
> flow-2.3a,
> > flow-2.4a would bring the most benefits, and batch writes would be an
> > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> > both manipulations written into the transaction log, we can combine them
> in
> > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > multiple 2.4a's into one batch for processing.
> > As we can see from “Transaction Flow” of [PIP-31: Transactional
> Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ),
> > these instructions are strictly sequential (guaranteed by the client):
> > - flow-1.x end before flow-2.x start
> > - flow-2.x end before flow-3.x start
> > - flow-3.1a end before flow-3.3a start
> >
> > Therefore, the broker does not need to worry about the dependency of
> these
> > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> Transaction
> > Log Batch too.
> >
> > ## Goal
> > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> accept
> > multiple write requests, buffer all those records, and persist to a
> single
> > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > - Allow users to specify control of the max size, max record of The
> Buffer.
> > - Allow users to specify control max delay time of The Write Request.
> > - Multiple raw data can be recovered from Batched Entry.
> > - Configurable “batched implementation” and “common implementation”
> switch.
> >
> > ## Approach
> > ### Buffer requests and write Bookie
> > Create a new protobuf record called “Batched Transaction Data” with an
> > array inside. When receive a request, we put it in the array.
> >
> > Request:
> > ```
> > [Request 1]{ data, callback }
> > [Request 2]]{ data, callback }
> > …
> > …
> > [Request N]]{ data, callback }
> > ```
> > Buffer:
> > ```
> > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > ```
> > Write Bookie:
> > ```
> > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > LedgerHandle callback: ledgerId=1, entryId=1
> > ```
> > Request-Callback:
> > ```
> > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > …
> > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> >
> > ### Delete BatchedTransactionMeta
> > [PIP 45](
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> )
> > has supported batch index delete. So the Raw Data added to a batch can be
> > with different batch indexes but with the same ledger ID and entry ID.
> >
> > Read:
> > ```
> > [BatchedTransactionData]
> > ```
> > After split:
> > ```
> > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> > Users can delete whole of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > ```
> > Users can also delete only part of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > ```
> >
> > ## Changes
> >
> > ### Protocol Changes
> > New protobuf record to buffer requests.
> >
> > BatchedTransactionMetadataEntry
> > ```
> > message BatchedTransactionMetadataEntry{
> >   // Array for buffer transaction log data.
> >   repeated TransactionMetadataEntry transaction_log = 12;
> > }
> > ```
> >
> > BatchedPendingAckMetadataEntry
> > ```
> > message BatchedPendingAckMetadataEntry{
> >   // Array for buffer pending ack data.
> >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > }
> > ```
> >
> > Note: To ensure forward compatibility, we need to distinguish the old
> > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > BatchedTransactionData data, and we add A magic number in front of the
> > bytes that proto serializes:
> > ```
> > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > ```
> > Read Entry:
> > ```
> >                            /-- true --> [BatchedTransactionMetadataEntry]
> > [Entry] --> has Magic Num ?
> >                            \-- false --> [TransactionMetadataEntry]
> > ```
> >
> >
> > ### API Changes
> >
> > BatchAddDataCallback
> > The Transaction Coordinator does not directly operate the Managed Ledger,
> > uses the Transaction Log Store to operate on Managed Ledger. The Managed
> > Ledger write API provides a callback class: AddEntryCallback, the same
> > Transaction Log Store that provides bulk writes, provides a callback
> class:
> > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> > why do we need BatchAddDataCallback </a>.
> >
> > ![WechatIMG7](
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > )
> > Figure.BatchAddDataCallback in Write Flow
> >
> > ```java
> > interface BatchAddDataCallback {
> >     /**
> >      * Successed callback function for “add data asynchronously”
> >      *
> >      * @param posotion A Position is a pointer to a specific entry into
> the
> > managed ledger.
> >      * @param byteBuf The raw data which added.
> >      * @param batchIndex Raw data count in The whole Batched Entry.
> >      * @param batchSize The current raw data index in the batch.
> >      * @param ctx opaque context
> >      */
> >     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> > int batchSize, Object context);
> >     /**
> >      * Failure callback function for “add data asynchronously”
> >      *
> >      * @param ctx opaque context
> >      */
> >     void addFailed(ManagedLedgerException exception, Object ctx);
> > }
> > ```
> >
> > ### Configuration Changes
> > Add the Batch threshold parameters to control the refresh frequency.
> >
> > broker.conf
> > ```
> > transactionLogBatchedWriteEnabled = false;
> > transactionLogBatchedWriteMaxRecords= 512;
> > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > transactionLogBatchedWriteMaxDelayInMillis= 1;
> >
> > pendingAckBatchedWriteEnabled = false;
> > pendingAckBatchedWriteMaxRecords= 512;
> > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > ```
> >
> > ### Compatibility
> > After the batch feature is enabled, users can only downgrade to the
> larger
> > than “first version that supports BatchedTransactionMeta reading” to
> > consume data. Data in a lower version broker cannot be parsed, resulting
> in
> > data loss. We also provide <a href="#librarySupports"> Library support
> for
> > Compatibility with older versions Broker</a>, If the user uses this
> library
> > on older version Broker<sup>[0]</sup>, all new data results can be
> > processed correctly and none of the data will be lost.
> >
> > ----
> > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> >
> > ### Observability
> > When using the Batch feature, users will adjust the frequency of disk
> > brushing to achieve the optimal performance. We provide two observable
> > indicators for users' reference
> >
> > ```
> > BatchedDataStoreMXBeanImpl {
> >     /** The number of logs in each batch. **/
> >     Rate batchRecordCount;
> >     /** The size of each batch. **/
> >     Rate batchSizeCount;
> > }
> > ```
> >
> > ## Test plan
> > The test should cover the following cases:
> >
> > - The batch mechanism works abides by the total count, total size, and
> max
> > delay limitation.
> > - The returned position for writing data is correct.
> > - The managedCursor can delete and mark delete the
> BatchedTransactionMeta.
> > - Performance tests and compare before-after improvement.
> >
> > ## The appendix
> >
> > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > BatchAddDataCallback  </p>
> > After all produced messages and acknowledgements to all partitions are
> > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > transaction status message to its transaction log, indicating that the
> > transaction is complete (shown as 3.3a in the diagram). At this point,
> all
> > the messages pertaining to the transaction in its transaction log can
> > safely be removed.
> > e.g. There are two transactions:
> > ![截屏2022-06-10 11 56 49](
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > )
> > Transaction Log Write:
> > ```
> > transaction_1: start transaction
> > transaction_2: start transaction
> > transaction_1: add partition to tx
> > transaction_1: add subscription to tx
> > transaction_2: add partition to tx
> > transaction_1: commit
> > ```
> > Bookie Write:
> > ```
> > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > ```
> > Bookie Response:
> > ```
> > {ledgerId=2, entryId=3}
> > ```
> > Transaction Log callback:
> > ```
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> ctx}
> > ```
> > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> to
> > [0,2,3,5] and transaction_2 relations to [1,4].
> > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> > needed because the transaction has already been completed, but now we
> could
> > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > transaction_2 is not finished and we still need them for the recovery
> > operation.
> > The BatchIndex and BatchSize can clearly indicate the location of each
> > transaction log in the ledger. When the transaction log is no longer
> used,
> > Users can accurately delete it according to position and batchIndex.
> >
> > ### <p id="librarySupports">Library support for Compatibility with older
> > versions Broker</p>
> > In broker.conf we can configure the
> > [transactionMetadataStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> )
> > to replace the implementation of TransactionLog, we can also configure
> the
> > [transactionPendingAckStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> )
> > to replace the implementation of PendingAckStore,
> > We will provide a library containing the classes which can read batched
> > transaction logs and pending ack logs:
> >
> > #### TransactionLogBatchEnryReadableImpl
> > ```java
> > public class TransactionLogBatchEnryReadableImpl extends
> > MLTransactionLogImpl {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is transction log or batched transaction log. If
> > the Entry is a
> >     * transaction log, it maintains the same logic as the parent class.
> If
> > the transaction log
> >     * is batched, it will be split into transaction log and processed
> > according to the original
> >     * logic
> >      */
> >     @Override
> >     void replayAsync(TransactionLogReplayCallback
> > transactionLogReplayCallback);
> > }
> > ```
> >
> > #### PendingAckStoreBatchEntryReadableImpl
> > ```java
> > public class PendingAckStoreBatchEntryReadableImpl extends
> > MLPendingAckStore {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is pending ack log or batched pending ack log.
> If
> > the Entry is a
> >     * pending ack log, it maintains the same logic as the parent class.
> If
> > the pending ack
> >     * log is batched, it will be split into pending ack log and processed
> > according to the
> >     * original logic
> >      */
> >     void replayAsync(pendingAckHandle, executorService);
> > }
> > ```
> >
> > How to use this library
> > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > set transactionPendingAckStoreProviderClassName is
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > 3. Restart broker.
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Yubiao Feng <yu...@streamnative.io.INVALID>.
> I am not sure I understand the part of making it configurable via a
classname.
I believe it is better to simply have a flag "transactionEnableBatchWrites".
Otherwise the matrix of possible implementations will grow without limits.

Good idea, I've modified the design and added a switch in the Configure
Changes section.

On Fri, Jun 10, 2022 at 7:14 PM Enrico Olivelli <eo...@gmail.com> wrote:

> I have read the PIP, and overall I agree with the design.
> Good work !
>
> I am not sure I understand the part of making it configurable via a
> classname.
> I believe it is better to simply have a flag
> "transactionEnableBatchWrites".
> Otherwise the matrix of possible implementations will grow without limits.
>
> Enrico
>
> Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> <yu...@streamnative.io.invalid> ha scritto:
> >
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Batch writing ledger for transaction operation"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/15370
> >
> > ## Motivation
> >
> > Before reading the background, I suggest you read section “Transaction
> > Flow” of [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > )
> >
> > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> </p>
> > ![MG3](
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > )
> > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > - The gray square boxes represent logical components.
> > - All the blue boxes represent logs. The logs are usually Managed ledger
> > - Each arrow represents the request flow or message flow. These
> operations
> > occur in sequence indicated by the numbers next to each arrow.
> > - The black arrows indicate those shared by transaction and normal flow.
> > - The blue arrows represent normal-message-specific flow.
> > - The orange arrows represent transaction-message-specific flow.
> > - The sections below are numbered to match the operations showed in the
> > diagram(differ from [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ))
> >
> >
> > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> > [Acknowledgement State Machine](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> )
> > tells about the changes of the Acknowledge State and why we need
> persistent
> > “The Log which the Acknowledgement State is PENDING_ACK”.
> > #### 2.4a’ Mark messages is no longer useful with current subscription
> > Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> > #### 3.2b Mark messages is no longer useful with current subscription
> > The implementation here is exactly the same as 2.4a’, except that the
> > execution is triggered later, after the Transaction has been committed.
> >
> >
> > ### Analyze the performance cost of transaction
> > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> vs.
> > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> > remaining orange arrows are the additional performance overhead of all
> > transactions.
> > In terms of whether or not each transaction is executed multiple times,
> we
> > can split the flow into two classes(Optimizing a process that is executed
> > multiple times will yield more benefits):
> > - Executed once each transaction: flow-1.x and flow-3.x
> > - Executed multiple times each transaction: flow-2.x
> >
> > So optimizing the flow 2.x with a lot of execution is a good choice.
> Let's
> > split flow-2.x into two groups: those that cost more and those that cost
> > less:
> > - No disk written: flow-2.1 and fow-2.3
> > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> >
> > From the previous analysis, we found that optimizing flow-2.1a,
> flow-2.3a,
> > flow-2.4a would bring the most benefits, and batch writes would be an
> > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> > both manipulations written into the transaction log, we can combine them
> in
> > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > multiple 2.4a's into one batch for processing.
> > As we can see from “Transaction Flow” of [PIP-31: Transactional
> Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ),
> > these instructions are strictly sequential (guaranteed by the client):
> > - flow-1.x end before flow-2.x start
> > - flow-2.x end before flow-3.x start
> > - flow-3.1a end before flow-3.3a start
> >
> > Therefore, the broker does not need to worry about the dependency of
> these
> > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> Transaction
> > Log Batch too.
> >
> > ## Goal
> > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> accept
> > multiple write requests, buffer all those records, and persist to a
> single
> > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > - Allow users to specify control of the max size, max record of The
> Buffer.
> > - Allow users to specify control max delay time of The Write Request.
> > - Multiple raw data can be recovered from Batched Entry.
> > - Configurable “batched implementation” and “common implementation”
> switch.
> >
> > ## Approach
> > ### Buffer requests and write Bookie
> > Create a new protobuf record called “Batched Transaction Data” with an
> > array inside. When receive a request, we put it in the array.
> >
> > Request:
> > ```
> > [Request 1]{ data, callback }
> > [Request 2]]{ data, callback }
> > …
> > …
> > [Request N]]{ data, callback }
> > ```
> > Buffer:
> > ```
> > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > ```
> > Write Bookie:
> > ```
> > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > LedgerHandle callback: ledgerId=1, entryId=1
> > ```
> > Request-Callback:
> > ```
> > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > …
> > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> >
> > ### Delete BatchedTransactionMeta
> > [PIP 45](
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> )
> > has supported batch index delete. So the Raw Data added to a batch can be
> > with different batch indexes but with the same ledger ID and entry ID.
> >
> > Read:
> > ```
> > [BatchedTransactionData]
> > ```
> > After split:
> > ```
> > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> > Users can delete whole of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > ```
> > Users can also delete only part of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > ```
> >
> > ## Changes
> >
> > ### Protocol Changes
> > New protobuf record to buffer requests.
> >
> > BatchedTransactionMetadataEntry
> > ```
> > message BatchedTransactionMetadataEntry{
> >   // Array for buffer transaction log data.
> >   repeated TransactionMetadataEntry transaction_log = 12;
> > }
> > ```
> >
> > BatchedPendingAckMetadataEntry
> > ```
> > message BatchedPendingAckMetadataEntry{
> >   // Array for buffer pending ack data.
> >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > }
> > ```
> >
> > Note: To ensure forward compatibility, we need to distinguish the old
> > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > BatchedTransactionData data, and we add A magic number in front of the
> > bytes that proto serializes:
> > ```
> > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > ```
> > Read Entry:
> > ```
> >                            /-- true --> [BatchedTransactionMetadataEntry]
> > [Entry] --> has Magic Num ?
> >                            \-- false --> [TransactionMetadataEntry]
> > ```
> >
> >
> > ### API Changes
> >
> > BatchAddDataCallback
> > The Transaction Coordinator does not directly operate the Managed Ledger,
> > uses the Transaction Log Store to operate on Managed Ledger. The Managed
> > Ledger write API provides a callback class: AddEntryCallback, the same
> > Transaction Log Store that provides bulk writes, provides a callback
> class:
> > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> > why do we need BatchAddDataCallback </a>.
> >
> > ![WechatIMG7](
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > )
> > Figure.BatchAddDataCallback in Write Flow
> >
> > ```java
> > interface BatchAddDataCallback {
> >     /**
> >      * Successed callback function for “add data asynchronously”
> >      *
> >      * @param posotion A Position is a pointer to a specific entry into
> the
> > managed ledger.
> >      * @param byteBuf The raw data which added.
> >      * @param batchIndex Raw data count in The whole Batched Entry.
> >      * @param batchSize The current raw data index in the batch.
> >      * @param ctx opaque context
> >      */
> >     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> > int batchSize, Object context);
> >     /**
> >      * Failure callback function for “add data asynchronously”
> >      *
> >      * @param ctx opaque context
> >      */
> >     void addFailed(ManagedLedgerException exception, Object ctx);
> > }
> > ```
> >
> > ### Configuration Changes
> > Add the Batch threshold parameters to control the refresh frequency.
> >
> > broker.conf
> > ```
> > transactionLogBatchedWriteEnabled = false;
> > transactionLogBatchedWriteMaxRecords= 512;
> > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > transactionLogBatchedWriteMaxDelayInMillis= 1;
> >
> > pendingAckBatchedWriteEnabled = false;
> > pendingAckBatchedWriteMaxRecords= 512;
> > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > ```
> >
> > ### Compatibility
> > After the batch feature is enabled, users can only downgrade to the
> larger
> > than “first version that supports BatchedTransactionMeta reading” to
> > consume data. Data in a lower version broker cannot be parsed, resulting
> in
> > data loss. We also provide <a href="#librarySupports"> Library support
> for
> > Compatibility with older versions Broker</a>, If the user uses this
> library
> > on older version Broker<sup>[0]</sup>, all new data results can be
> > processed correctly and none of the data will be lost.
> >
> > ----
> > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> >
> > ### Observability
> > When using the Batch feature, users will adjust the frequency of disk
> > brushing to achieve the optimal performance. We provide two observable
> > indicators for users' reference
> >
> > ```
> > BatchedDataStoreMXBeanImpl {
> >     /** The number of logs in each batch. **/
> >     Rate batchRecordCount;
> >     /** The size of each batch. **/
> >     Rate batchSizeCount;
> > }
> > ```
> >
> > ## Test plan
> > The test should cover the following cases:
> >
> > - The batch mechanism works abides by the total count, total size, and
> max
> > delay limitation.
> > - The returned position for writing data is correct.
> > - The managedCursor can delete and mark delete the
> BatchedTransactionMeta.
> > - Performance tests and compare before-after improvement.
> >
> > ## The appendix
> >
> > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > BatchAddDataCallback  </p>
> > After all produced messages and acknowledgements to all partitions are
> > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > transaction status message to its transaction log, indicating that the
> > transaction is complete (shown as 3.3a in the diagram). At this point,
> all
> > the messages pertaining to the transaction in its transaction log can
> > safely be removed.
> > e.g. There are two transactions:
> > ![截屏2022-06-10 11 56 49](
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > )
> > Transaction Log Write:
> > ```
> > transaction_1: start transaction
> > transaction_2: start transaction
> > transaction_1: add partition to tx
> > transaction_1: add subscription to tx
> > transaction_2: add partition to tx
> > transaction_1: commit
> > ```
> > Bookie Write:
> > ```
> > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > ```
> > Bookie Response:
> > ```
> > {ledgerId=2, entryId=3}
> > ```
> > Transaction Log callback:
> > ```
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> ctx}
> > ```
> > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> to
> > [0,2,3,5] and transaction_2 relations to [1,4].
> > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> > needed because the transaction has already been completed, but now we
> could
> > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > transaction_2 is not finished and we still need them for the recovery
> > operation.
> > The BatchIndex and BatchSize can clearly indicate the location of each
> > transaction log in the ledger. When the transaction log is no longer
> used,
> > Users can accurately delete it according to position and batchIndex.
> >
> > ### <p id="librarySupports">Library support for Compatibility with older
> > versions Broker</p>
> > In broker.conf we can configure the
> > [transactionMetadataStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> )
> > to replace the implementation of TransactionLog, we can also configure
> the
> > [transactionPendingAckStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> )
> > to replace the implementation of PendingAckStore,
> > We will provide a library containing the classes which can read batched
> > transaction logs and pending ack logs:
> >
> > #### TransactionLogBatchEnryReadableImpl
> > ```java
> > public class TransactionLogBatchEnryReadableImpl extends
> > MLTransactionLogImpl {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is transction log or batched transaction log. If
> > the Entry is a
> >     * transaction log, it maintains the same logic as the parent class.
> If
> > the transaction log
> >     * is batched, it will be split into transaction log and processed
> > according to the original
> >     * logic
> >      */
> >     @Override
> >     void replayAsync(TransactionLogReplayCallback
> > transactionLogReplayCallback);
> > }
> > ```
> >
> > #### PendingAckStoreBatchEntryReadableImpl
> > ```java
> > public class PendingAckStoreBatchEntryReadableImpl extends
> > MLPendingAckStore {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is pending ack log or batched pending ack log.
> If
> > the Entry is a
> >     * pending ack log, it maintains the same logic as the parent class.
> If
> > the pending ack
> >     * log is batched, it will be split into pending ack log and processed
> > according to the
> >     * original logic
> >      */
> >     void replayAsync(pendingAckHandle, executorService);
> > }
> > ```
> >
> > How to use this library
> > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > set transactionPendingAckStoreProviderClassName is
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > 3. Restart broker.
>

Re: [DISCUSS] [PIP-160] Batch writing ledger for transaction operation

Posted by Enrico Olivelli <eo...@gmail.com>.
I have read the PIP, and overall I agree with the design.
Good work !

I am not sure I understand the part of making it configurable via a classname.
I believe it is better to simply have a flag "transactionEnableBatchWrites".
Otherwise the matrix of possible implementations will grow without limits.

Enrico

Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
<yu...@streamnative.io.invalid> ha scritto:
>
> Hi Pulsar community:
>
> I open a pip to discuss "Batch writing ledger for transaction operation"
>
> Proposal Link: https://github.com/apache/pulsar/issues/15370
>
> ## Motivation
>
> Before reading the background, I suggest you read section “Transaction
> Flow” of [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> )
>
> ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p>
> ![MG3](
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> )
> In *Figure 1. Normal Flow vs. Transaction Flow*:
> - The gray square boxes represent logical components.
> - All the blue boxes represent logs. The logs are usually Managed ledger
> - Each arrow represents the request flow or message flow. These operations
> occur in sequence indicated by the numbers next to each arrow.
> - The black arrows indicate those shared by transaction and normal flow.
> - The blue arrows represent normal-message-specific flow.
> - The orange arrows represent transaction-message-specific flow.
> - The sections below are numbered to match the operations showed in the
> diagram(differ from [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ))
>
>
> #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> [Acknowledgement State Machine](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u)
> tells about the changes of the Acknowledge State and why we need persistent
> “The Log which the Acknowledgement State is PENDING_ACK”.
> #### 2.4a’ Mark messages is no longer useful with current subscription
> Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> #### 3.2b Mark messages is no longer useful with current subscription
> The implementation here is exactly the same as 2.4a’, except that the
> execution is triggered later, after the Transaction has been committed.
>
>
> ### Analyze the performance cost of transaction
> As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs.
> Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> remaining orange arrows are the additional performance overhead of all
> transactions.
> In terms of whether or not each transaction is executed multiple times, we
> can split the flow into two classes(Optimizing a process that is executed
> multiple times will yield more benefits):
> - Executed once each transaction: flow-1.x and flow-3.x
> - Executed multiple times each transaction: flow-2.x
>
> So optimizing the flow 2.x with a lot of execution is a good choice. Let's
> split flow-2.x into two groups: those that cost more and those that cost
> less:
> - No disk written: flow-2.1 and fow-2.3
> - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
>
> From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a,
> flow-2.4a would bring the most benefits, and batch writes would be an
> excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> both manipulations written into the transaction log, we can combine them in
> one batch; 2.4a is the operation of writing pending ACK log, we combine
> multiple 2.4a's into one batch for processing.
> As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx),
> these instructions are strictly sequential (guaranteed by the client):
> - flow-1.x end before flow-2.x start
> - flow-2.x end before flow-3.x start
> - flow-3.1a end before flow-3.3a start
>
> Therefore, the broker does not need to worry about the dependency of these
> flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction
> Log Batch too.
>
> ## Goal
> Provide a mechanism for Transaction Log Store and Pending Ack Store: accept
> multiple write requests, buffer all those records, and persist to a single
> BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> - Allow users to specify control of the max size, max record of The Buffer.
> - Allow users to specify control max delay time of The Write Request.
> - Multiple raw data can be recovered from Batched Entry.
> - Configurable “batched implementation” and “common implementation” switch.
>
> ## Approach
> ### Buffer requests and write Bookie
> Create a new protobuf record called “Batched Transaction Data” with an
> array inside. When receive a request, we put it in the array.
>
> Request:
> ```
> [Request 1]{ data, callback }
> [Request 2]]{ data, callback }
> …
> …
> [Request N]]{ data, callback }
> ```
> Buffer:
> ```
> [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> ```
> Write Bookie:
> ```
> LedgerHandle async write ( BatchedTransactionData to byteBuf )
> LedgerHandle callback: ledgerId=1, entryId=1
> ```
> Request-Callback:
> ```
> Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> …
> …
> Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> ```
>
> ### Delete BatchedTransactionMeta
> [PIP 45](
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level)
> has supported batch index delete. So the Raw Data added to a batch can be
> with different batch indexes but with the same ledger ID and entry ID.
>
> Read:
> ```
> [BatchedTransactionData]
> ```
> After split:
> ```
> {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> …
> {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> ```
> Users can delete whole of the batched Entry:
> ```java
> cursor.delete( Position {ledgerId = 1, entryId = 1} )
> ```
> Users can also delete only part of the batched Entry:
> ```java
> cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> ```
>
> ## Changes
>
> ### Protocol Changes
> New protobuf record to buffer requests.
>
> BatchedTransactionMetadataEntry
> ```
> message BatchedTransactionMetadataEntry{
>   // Array for buffer transaction log data.
>   repeated TransactionMetadataEntry transaction_log = 12;
> }
> ```
>
> BatchedPendingAckMetadataEntry
> ```
> message BatchedPendingAckMetadataEntry{
>   // Array for buffer pending ack data.
>   repeated PendingAckMetadataEntry pending_ack_log=6;
> }
> ```
>
> Note: To ensure forward compatibility, we need to distinguish the old
> TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> BatchedTransactionData data, and we add A magic number in front of the
> bytes that proto serializes:
> ```
> [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> ```
> Read Entry:
> ```
>                            /-- true --> [BatchedTransactionMetadataEntry]
> [Entry] --> has Magic Num ?
>                            \-- false --> [TransactionMetadataEntry]
> ```
>
>
> ### API Changes
>
> BatchAddDataCallback
> The Transaction Coordinator does not directly operate the Managed Ledger,
> uses the Transaction Log Store to operate on Managed Ledger. The Managed
> Ledger write API provides a callback class: AddEntryCallback, the same
> Transaction Log Store that provides bulk writes, provides a callback class:
> BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> why do we need BatchAddDataCallback </a>.
>
> ![WechatIMG7](
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> )
> Figure.BatchAddDataCallback in Write Flow
>
> ```java
> interface BatchAddDataCallback {
>     /**
>      * Successed callback function for “add data asynchronously”
>      *
>      * @param posotion A Position is a pointer to a specific entry into the
> managed ledger.
>      * @param byteBuf The raw data which added.
>      * @param batchIndex Raw data count in The whole Batched Entry.
>      * @param batchSize The current raw data index in the batch.
>      * @param ctx opaque context
>      */
>     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> int batchSize, Object context);
>     /**
>      * Failure callback function for “add data asynchronously”
>      *
>      * @param ctx opaque context
>      */
>     void addFailed(ManagedLedgerException exception, Object ctx);
> }
> ```
>
> ### Configuration Changes
> Add the Batch threshold parameters to control the refresh frequency.
>
> broker.conf
> ```
> transactionLogBatchedWriteEnabled = false;
> transactionLogBatchedWriteMaxRecords= 512;
> transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> transactionLogBatchedWriteMaxDelayInMillis= 1;
>
> pendingAckBatchedWriteEnabled = false;
> pendingAckBatchedWriteMaxRecords= 512;
> pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> pendingAckBatchedWriteMaxDelayInMillis= 1;
> ```
>
> ### Compatibility
> After the batch feature is enabled, users can only downgrade to the larger
> than “first version that supports BatchedTransactionMeta reading” to
> consume data. Data in a lower version broker cannot be parsed, resulting in
> data loss. We also provide <a href="#librarySupports"> Library support for
> Compatibility with older versions Broker</a>, If the user uses this library
> on older version Broker<sup>[0]</sup>, all new data results can be
> processed correctly and none of the data will be lost.
>
> ----
> **[0]old version Broker**: Not less than 2.9.2 and 2.10
>
> ### Observability
> When using the Batch feature, users will adjust the frequency of disk
> brushing to achieve the optimal performance. We provide two observable
> indicators for users' reference
>
> ```
> BatchedDataStoreMXBeanImpl {
>     /** The number of logs in each batch. **/
>     Rate batchRecordCount;
>     /** The size of each batch. **/
>     Rate batchSizeCount;
> }
> ```
>
> ## Test plan
> The test should cover the following cases:
>
> - The batch mechanism works abides by the total count, total size, and max
> delay limitation.
> - The returned position for writing data is correct.
> - The managedCursor can delete and mark delete the BatchedTransactionMeta.
> - Performance tests and compare before-after improvement.
>
> ## The appendix
>
> ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> BatchAddDataCallback  </p>
> After all produced messages and acknowledgements to all partitions are
> committed or aborted, the TC writes the final COMMITTED or ABORTED
> transaction status message to its transaction log, indicating that the
> transaction is complete (shown as 3.3a in the diagram). At this point, all
> the messages pertaining to the transaction in its transaction log can
> safely be removed.
> e.g. There are two transactions:
> ![截屏2022-06-10 11 56 49](
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> )
> Transaction Log Write:
> ```
> transaction_1: start transaction
> transaction_2: start transaction
> transaction_1: add partition to tx
> transaction_1: add subscription to tx
> transaction_2: add partition to tx
> transaction_1: commit
> ```
> Bookie Write:
> ```
> [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> ```
> Bookie Response:
> ```
> {ledgerId=2, entryId=3}
> ```
> Transaction Log callback:
> ```
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx}
> ```
> The entry(2,3) actually has 6 transaction logs, transaction_1 relations to
> [0,2,3,5] and transaction_2 relations to [1,4].
> After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> needed because the transaction has already been completed, but now we could
> not delete Entry(2,3), Because the logs [1,4] are still useful that
> transaction_2 is not finished and we still need them for the recovery
> operation.
> The BatchIndex and BatchSize can clearly indicate the location of each
> transaction log in the ledger. When the transaction log is no longer used,
> Users can accurately delete it according to position and batchIndex.
>
> ### <p id="librarySupports">Library support for Compatibility with older
> versions Broker</p>
> In broker.conf we can configure the
> [transactionMetadataStoreProviderClassName](
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535)
> to replace the implementation of TransactionLog, we can also configure the
> [transactionPendingAckStoreProviderClassName](
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549)
> to replace the implementation of PendingAckStore,
> We will provide a library containing the classes which can read batched
> transaction logs and pending ack logs:
>
> #### TransactionLogBatchEnryReadableImpl
> ```java
> public class TransactionLogBatchEnryReadableImpl extends
> MLTransactionLogImpl {
>     /**
>      * Different from the parent class, when this method reads an Entry, it
> can identify
>      * whether the Entry is transction log or batched transaction log. If
> the Entry is a
>     * transaction log, it maintains the same logic as the parent class. If
> the transaction log
>     * is batched, it will be split into transaction log and processed
> according to the original
>     * logic
>      */
>     @Override
>     void replayAsync(TransactionLogReplayCallback
> transactionLogReplayCallback);
> }
> ```
>
> #### PendingAckStoreBatchEntryReadableImpl
> ```java
> public class PendingAckStoreBatchEntryReadableImpl extends
> MLPendingAckStore {
>     /**
>      * Different from the parent class, when this method reads an Entry, it
> can identify
>      * whether the Entry is pending ack log or batched pending ack log. If
> the Entry is a
>     * pending ack log, it maintains the same logic as the parent class. If
> the pending ack
>     * log is batched, it will be split into pending ack log and processed
> according to the
>     * original logic
>      */
>     void replayAsync(pendingAckHandle, executorService);
> }
> ```
>
> How to use this library
> 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> set transactionPendingAckStoreProviderClassName is
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> 3. Restart broker.