You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/08 16:21:03 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #10278: KAFKA-10526: leader fsync deferral on write

vamossagar12 opened a new pull request #10278:
URL: https://github.com/apache/kafka/pull/10278


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-850068544


   @jsancio , yeah would do that. Actually I have a question here. I didn't get a chance to look at this PR for some time and now i see things have changed which impacts my implementation strategy.
   
   In the older versions, there used to be a direct flushLeaderLog() call from appendLeaderChangeMessage. This is the older version:
   
   ```
       private void appendLeaderChangeMessage(LeaderState state, long baseOffset, long currentTimeMs) {
           List<Voter> voters = convertToVoters(state.followers());
           List<Voter> grantingVoters = convertToVoters(state.grantingVoters());
   
           // Adding the leader to the voters as any voter always votes for itself.
           voters.add(new Voter().setVoterId(state.election().leaderId()));
   
           LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
               .setLeaderId(state.election().leaderId())
               .setVoters(voters)
               .setGrantingVoters(grantingVoters);
   
           MemoryRecords records = MemoryRecords.withLeaderChangeMessage(
               baseOffset,
               currentTimeMs,
               quorum.epoch(),
               leaderChangeMessage
           );
   
           appendAsLeader(records);
           // For a LeaderChange message, flushLeaderLogOnHwmUpdate is set to false
           // as we have to flush the log irrespective of whether the HWM moved or not.
           flushLeaderLog(state, currentTimeMs, false);
       }
   
   ```
   I had added a parameter called `flushLeaderLogOnHwmUpdate` to flushLeaderLog which indicated if the flush depended upon HWM moving or not. For the case of control record it didn't so false was passed which indicates explicit flush while from `mayBeAppendBatched` true was passed which would ensure deferral. 
   
   In the latest version of KafkaRaftClient, I see that the explicit flush has been removed and instead LeaderState does a force drain. That ensures that there's only 1 place needed for flushLeaderLog().
   Now my doubt is, I can't add the fsync deferral logic directly over there as now it can have records which need explicit fsyncing - like control records. So, how do i segregate between the 2?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-803731332


   @jsancio , @hachikuji  have made the changes discussed here. This PR is ready to review so plz review whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r593419637



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       The invariant that the leader most satisfy is that the `highWatermark <= flushOffset`. The current implementation satisfies this by flushing after every append and implicitly defining `flushOffset == logEndOffset`. At a high-level, I think the goals is to allow `highWatermark <= flushOffset <= logEndOffset`.
   
   On the follower, things are a little different. On the follower the `flushOffset == logEndOffset` before a `Fetch` request can be sent. This is because the leader assumes that the fetch offset in the `Fetch` request is the offset that the follower has successfully replicated.
   
   The advantage of appending without flushing as soon as possible replication latency. The leader cannot replicate record batches to the followers and observers until they have been appended to the log.
   
   I am not exactly sure how exactly we want to implement this since I haven't looked at the details but I think you are correct that on the leader side of things we want to increase the `flushOffset` in the `Fetch` request handling code as the leader attempts to increase the high-watermark. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r596219371



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       We are tracking the LEO in two places:
   
   1. In `ReplicatedLog::endOffset`. This gets increase every time the log gets appended: https://github.com/apache/kafka/blob/28ee656081d5b7984c324c3ea3fc9c34614d17db/core/src/main/scala/kafka/log/Log.scala#L1302
   2. The `LeaderState` also stores what is now the LEO. One suggestion is for `LeaderState` to instead store the "flush offsets". In `LeaderState` the follower's flush offset is the LEO but for the local replica the "flush offset" may not be the LEO.
   
   An example of the high-watermark increasing but the LEO not changing:
   1. follower: LEO = 10
   2. leader: LEO = 100, FlushOffset = 100, HW = 0
   
   Follower successfully fetches for offset 10 => Leader: LEO = 100, FlushOffset = 100, HW = 10.
   Follower successfully fetches for offset 20 => Leader: LEO = 100, FlushOffset = 100, HW = 20.
   
   In this example if the leader already flushed to the LEO then there is no need to flush again when increasing the HW.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-798829853


   > @vamossagar12 this PR only contains one commit that comments out `flushLeaderLog` is that intentional or is the PR missing other changes?
   
   Yeah @jsancio , this is just a draft PR that I had created to outline the approach that I intend to follow. Sorry for the confusion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-812492705


   @jsancio  could you plz take a look at this PR whenever you get the chance...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r597054403



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       Got it. So, essentially what you are saying is within onUpdateLeaderHighWatermark I can check if the HWM moved due to an increase in the LEO(which can be stored to keep track of the last greatest value flushed) or due to other cases. So, we don't need to flush only when HWM was crossed but instead we are adding another invariant that the LEO should also have moved.
   
   I think it makes sense to me now. I will start making the changes..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r589561309



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       @hachikuji , Created this draft PR to outline my understanding. The reason I commented this line is based upon my understanding of this line in the KIP-595:
   
   ```Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness, i.e. we are dropping group flushing for this topic```
   
   So, this is the point where after appending to the local state of the leader, the leader log is always flushed. 
   
   The place where we need to make change would be:
   
   ` if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
                       onUpdateLeaderHighWatermark(state, currentTimeMs);
                   }` in tryCompleteFetchRequest. 
   I think that is because, `state.updateReplicaState` would return true only if after handling the fetch request from a given replica and updating it's end offset, it was possible to update the HWM ie a majority - 1 replicas have reached a certain offset. So, going by the description in the ticket, this could be the spot where fsync could be performed instead of doing it every time.
   
   Plz let me know if I am thinking in the right direction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-828720442


   @vamossagar12 Can you add a description to the PR? It is helpful when reviewing this PR to know what strategy you are using to manage the flush offset and log flush.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r593840882



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       Thanks for the explanation.
   
   So, the original RAFT approach, as even explained in the KIP-595, is push based meaning the leader will push the new records on it's own. In the Kafka world, it is being done using a pull model where the followers use the Fetch API to request for any new writes that might have arrived which also doubles up as the heartbeat.
   
   Here's the way I thought we can satisfy the invariants that you had mentioned:
   
   1. We remove the `flushLeaderLog` method call in `maybeAppendBatches` in KafkaRaftClient. The leader can keep appending batches as and when it sees fit.
   2. When the leader receives a Fetch request from any of the Followers, while handling the request in `tryCompleteFetchRequest` it already checks after updating the replica state in `updateReplicaState` it checks if the high water mark can be moved or not.  This  is happening here: 
   
   ```
   if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { 
       onUpdateLeaderHighWatermark(state, currentTimeMs); 
   }
   ```
   
   So, I think, we can move the flushLeaderLog to this place where we are sure after handling the Fetch request that the HWM has been moved. 
   
   With this approach, I think even if the current Leader dies after noticing that the HWM can be moved but before/during flushing its log, the next leader election would ensure that the newly elected leader would have the writes. This is because the followers also flush the log immediately after receiving a FetchResponse.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-822622340


   hey @jsancio / @hachikuji  could you plz review the PR whenever you get the chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r596185719



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       @jsancio , As per the current implementation of `flushLeaderlog` it first updates the LEO and then flushes. 
   
   ```
   private void flushLeaderLog(LeaderState state, long currentTimeMs) {
           // We update the end offset before flushing so that parked fetches can return sooner
           updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
           log.flush();
   }
   ```
   
   `updateLeaderEndOffsetAndTimestamp` updates the LEO and then checks if HWM can be moved or not. So, can you help me with an example of how `It is possible for the high-watermark to increase without the LEO increasing.` ? 
   
   Also, wanted to know how would it lead to unwanted flush calls?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r593988386



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       I think it makes sense to move this flush to `KafkaRaftClient::onUpdateLeaderHighWatermark`.
   
   It is possible for the high-watermark to increase without the LEO increasing. To avoid unnecessary flush calls, the leader should remember in `LeaderState` the LEO when it flushed the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r593840882



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
                     BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                     appendBatch(state, batch, currentTimeMs);
                 }
-                flushLeaderLog(state, currentTimeMs);
+                //flushLeaderLog(state, currentTimeMs);

Review comment:
       Thanks for the explanation.
   
   So, the original RAFT approach, as even explained in the KIP-595, is push based meaning the leader will push the new records on it's own. In the Kafka world, it is being done using a pull model where the followers use the Fetch API to request for any new writes that might have arrived which also doubles up as the heartbeat.
   
   Here's the way I thought we can satisfy the invariants that you had mentioned:
   
   1. We remove the `flushLeaderLog` method call in `maybeAppendBatches` in KafkaRaftClient. The leader can keep appending batches as and when it sees fit.
   2. When the leader receives a Fetch request from any of the Followers, while handling the request in `tryCompleteFetchRequest` it already checks after updating the replica state in `updateReplicaState` it checks if the high water mark can be moved or not.  This  is happening here: 
   
   > if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { onUpdateLeaderHighWatermark(state, currentTimeMs); }
   So, I think, we can move the flushLeaderLog to this place where we are sure after handling the Fetch request that the HWM has been moved. 
   
   With this approach, I think even if the current Leader dies after noticing that the HWM can be moved but before/during flushing its log, the next leader election would ensure that the newly elected leader would have the writes. This is because the followers also flush the log immediately after receiving a FetchResponse.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org