You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/29 04:35:17 UTC

[GitHub] [kafka] dengziming commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

dengziming commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1058721483


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -686,7 +687,15 @@ public Long apply(List<ApiMessageAndVersion> records) {
                             int i = 1;
                             for (ApiMessageAndVersion message : records) {
                                 try {
-                                    replay(message.message(), Optional.empty(), prevEndOffset + records.size());

Review Comment:
   Here we were using `prevEndOffset + records.size()` to make it consistent with the code in `handleCommit` and `handleSnapshot`.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1293,15 +1358,20 @@ private void handleFeatureControlChange() {
      *
      * @param message           The metadata record
      * @param snapshotId        The snapshotId if this record is from a snapshot
-     * @param batchLastOffset   The offset of the last record in the log batch, or the lastContainedLogOffset
-     *                          if this record is from a snapshot, this is used along with RegisterBrokerRecord
+     * @param offset            The record offset or snapshot offset.
+     * @param epoch             The log epoch of the record or snapshot offset.
      */
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
+    private void replay(
+        ApiMessage message,
+        Optional<OffsetAndEpoch> snapshotId,
+        long offset,
+        int epoch
+    ) {
         logReplayTracker.replay(message);
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
-                clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
+                clusterControl.replay((RegisterBrokerRecord) message, offset);

Review Comment:
   Currently, we use this offset to remember the offset of each `BrokerRegisterationRecord`, so we can unfence a broker when it's high-watermark has passed it's `BrokerRegisterationRecord`.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1349,6 +1419,15 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
             case NO_OP_RECORD:
                 // NoOpRecord is an empty record and doesn't need to be replayed
                 break;
+            case BEGIN_TRANSACTION_RECORD:
+                beginTransaction(offset, epoch);

Review Comment:
   This is not the real offset when the record if from a snapshot, is there any problem for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org