You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/13 07:15:22 UTC

[GitHub] [kafka] feyman2016 opened a new pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

feyman2016 opened a new pull request #9739:
URL: https://github.com/apache/kafka/pull/9739


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r559082654



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -599,12 +599,12 @@ class LogCleanerTest {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
-      origin = AppendOrigin.Replication)
+      origin = AppendOrigin.Coordinator)

Review comment:
       Reverted since we have a new added `AppendOrigin.RaftLeader`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -599,12 +599,12 @@ class LogCleanerTest {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
-      origin = AppendOrigin.Replication)
+      origin = AppendOrigin.Coordinator)

Review comment:
       Reverted since we have a newly added `AppendOrigin.RaftLeader`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-770938213


   Fixed the KafkaMetadataLogTest, let's see if jenkins still complains~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r558935669



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-744203705


   @hachikuji Request for review, thanks!
   BTW,  failed tests `kafka.server.ControllerMutationQuotaTest.testUnboundedDeleteTopicsRequest` and `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` doesn't seem to be related, both have been executed successfully locally.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -180,7 +179,7 @@ private void startNewBatch() {
                 nextOffset,
                 time.milliseconds(),
                 false,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                epoch,

Review comment:
       Let me check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-762083498


   @hachikuji @dengziming Call for a second review, the 7 failed tests run successfully locally, should be unrelated to this PR, thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r563941848



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
       The call to `analyzeAndValidateRecords` is done both for leader as well as follower appends. Basically we do a shallow iteration over the batches in order to collect some information and validate the CRC. My thought was to pass `leaderEpoch` into `analyzeAndValidateRecords` and add a basic check like this:
   ```java
   records.batches.forEach { batch =>
     ...
     if (origin === RaftLeader && batch.partitionLeaderEpoch != leaderEpoch) {
       throw new InvalidRecordException("Append from Raft leader did not set the batch epoch correctly")
     }
   }  
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r541885151



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       This can be replaced by `val assignOffsets = if (origin != AppendOrigin.Replication)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-770391030


   @hachikuji Sorry for the delay, I'm on it now, will update it soon~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9739:
URL: https://github.com/apache/kafka/pull/9739


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-771413780


   @hachikuji Thanks for the review and help!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r541865977



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1477,6 +1477,7 @@ private void appendBatch(
     ) {
         try {
             int epoch = state.epoch();
+            batch.data.batches().forEach(recordBatch -> recordBatch.setPartitionLeaderEpoch(epoch));

Review comment:
       This is necessary for the sanity check in `Log.scala append()`
   ```
            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
                 maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-761754986


   @hachikuji @dengziming Addressed the comments, call for a second review, thanks a lot!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-771413780


   @hachikuji Thanks for the review and help!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -180,7 +179,7 @@ private void startNewBatch() {
                 nextOffset,
                 time.milliseconds(),
                 false,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                epoch,

Review comment:
       Let me check

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
       Make sense to me, it is also a little bit odd to me, but I put it here because I think `assignOffsets`==true for `appendAsLeader` and ==false for `appendAsFollower`, which means normally `assignOffsets` is determined by the caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we move the logic in `analyzeAndValidateRecords`, that means it need to determine whether to `assignOffsets` without caller info, does that doable? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r559082599



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1477,6 +1477,7 @@ private void appendBatch(
     ) {
         try {
             int epoch = state.epoch();
+            batch.data.batches().forEach(recordBatch -> recordBatch.setPartitionLeaderEpoch(epoch));

Review comment:
       @hachikuji I thought the same as you, re-check the code, the `PARTITION_LEADER_EPOCH_OFFSET` has been set in `DefaultRecordBatch#writeHeader`, but the problem is that the `BatchBuilder` is always constructed with the `epoch=RecordBatch.NO_PARTITION_LEADER_EPOCH`, so I updated it and reverted the L1480.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r542362372



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       Got it, will let @hachikuji review and fix it in one shot, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-770641473


   KafkaMetadataLogTest failed because they didn't pass the epoch check when it comes from RaftLeader, will check and update tonight


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r541864843



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -68,7 +68,7 @@ class KafkaMetadataLog(
 
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
+      origin = AppendOrigin.Replication)

Review comment:
       I intended to add a new `AppendOrigin` called `Leader` for this case, but then think it implies the same as the Replication, so just reuses it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r542086592



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       o, I mean `val assignOffsets = origin != AppendOrigin.Replication`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r541941894



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       @dengziming Thanks, but it needs `else` for a complete `if` expression




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r563941848



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
       The call to `analyzeAndValidateRecords` is done both for leader as well as follower appends. Basically we do a shallow iteration over the batches in order to collect some information and validate the CRC. My thought was to pass `leaderEpoch` into `analyzeAndValidateRecords` and add a basic check like this:
   ```java
   records.batches.forEach { batch =>
     ...
     if (origin === RaftLeader && batch.partitionLeaderEpoch != leaderEpoch) {
       throw new InvalidRecordException("Append from Raft leader did not set the batch epoch correctly")
     }
   }  
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-759902307


   @hachikuji Thanks for the review, I will address the comments shortly~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r559082616



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       Make sense, updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r560602833



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
       It is a little odd here is that we have to pass through `leaderEpoch` even though we expect the Raft leader to have set it already. Perhaps we should be validating it in `analyzeAndValidateRecords`. We can verify for each batch that the leader epoch matches when the append origin is `RaftLeader`. Does that make sense?

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -180,7 +179,7 @@ private void startNewBatch() {
                 nextOffset,
                 time.milliseconds(),
                 false,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                epoch,

Review comment:
       Good catch. Do we have any test cases in `BatchAccumulatorTest` which can be modified to catch this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-770267814


   @feyman2016 I can help getting this over the line if you don't have time. I think it's just a couple small changes after rebasing. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r553666938



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,

Review comment:
       It would be helpful to have a test case which verifies `appendAsLeader` with the new append origin.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -68,7 +68,7 @@ class KafkaMetadataLog(
 
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
+      origin = AppendOrigin.Replication)

Review comment:
       I was going to ask about adding a new `AppendOrigin`. I agree that the behavior should be the same as for `Replication`, but it seems like it could lead to confusion. Maybe we could add an `AppendOrigin.RaftLeader`. Then we can add a simple comment which emphasizes that the Raft leader is responsible for assigning offsets. That would also allow us to revert the changes in `LogCleanerTest`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1477,6 +1477,7 @@ private void appendBatch(
     ) {
         try {
             int epoch = state.epoch();
+            batch.data.batches().forEach(recordBatch -> recordBatch.setPartitionLeaderEpoch(epoch));

Review comment:
       Hmm.. Do we need this? I thought we already set leader epoch here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java#L256.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       By the way, it seems we might want to rename `assignOffsets` since we also rely on this flag for record validation. It's a bit on the verbose side, but maybe `validateMessagesAndAssignOffsets` to go along with the similarly named method in `LogValidator`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r553666738



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1050,7 +1050,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val assignOffsets = if (origin == AppendOrigin.Replication) false else true

Review comment:
       By the way, it seems we might want to rename `assignOffsets` since we also rely on this flag for record validation. It's a bit on the verbose side, but maybe `validateAndAssignOffsets` to go along with the similarly named method in `LogValidator`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 removed a comment on pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 removed a comment on pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#issuecomment-761754986


   @hachikuji @dengziming Addressed the comments, call for a second review, thanks a lot!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r541944104



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -599,12 +599,12 @@ class LogCleanerTest {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
-      origin = AppendOrigin.Replication)
+      origin = AppendOrigin.Coordinator)

Review comment:
       Since now for `AppendOrigin.Replication`, the `LogValidator` is bypassed when `Log.appendAsLeader` is called, which has an assumption that the records to be appended should have leader epoch ready in its underlying batches. And moreover, the transaction/group coordinator are supposed to be calling the `Log.appendAsLeader` with `origin = AppendOrigin.Coordinator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r559096843



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.RecordBatch;

Review comment:
       Fx checkstyle error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r559082063



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -68,7 +68,7 @@ class KafkaMetadataLog(
 
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
+      origin = AppendOrigin.Replication)

Review comment:
       Added a `AppendOrigin.RaftLeader` and also reverted the changes in `LogCleanerTest `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561589867



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
+    val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
       Make sense to me, it is also a little bit odd to me, but I put it here because I think `assignOffsets`==true for `appendAsLeader` and ==false for `appendAsFollower`, which means normally `assignOffsets` is determined by the caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we move the logic in `analyzeAndValidateRecords`, that means it need to determine whether to `assignOffsets` without caller info, does that doable? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org