You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/03 17:38:21 UTC

[GitHub] [kafka] jsancio opened a new pull request #10253: KAFKA-12376: Apply atomic append to the log

jsancio opened a new pull request #10253:
URL: https://github.com/apache/kafka/pull/10253


   Append to the log in one batch when handling:
   
   1. Client quota changes
   2. Configuration changes
   3. Feature changes
   4. Topic creation
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r586814226



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(

Review comment:
       In general, I agree but can we keep this reformatting? I found the old formatting hard to read and technically we are adding a new method called `scheduleAtomicWrite`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587635154



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
##########
@@ -50,13 +50,30 @@
      * offset before renouncing its leadership.  The listener should determine this by
      * monitoring the committed offsets.
      *
-     * @param epoch         The controller epoch.
-     * @param batch         The batch of messages to write.
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write

Review comment:
       These are phrases hence no period. For example none of the javadoc for the Java library have periods.
   
   I only modified the javadoc that I needed to update to not distract from the PR. We should do another pass and fix all of the comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587534836



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
##########
@@ -50,13 +50,30 @@
      * offset before renouncing its leadership.  The listener should determine this by
      * monitoring the committed offsets.
      *
-     * @param epoch         The controller epoch.
-     * @param batch         The batch of messages to write.
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write

Review comment:
       The other methods in the class consistently end `@param` with a period. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r586756040



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
##########
@@ -28,15 +28,21 @@
 class ControllerResult<T> {
     private final List<ApiMessageAndVersion> records;
     private final T response;
+    private final boolean isAtomic;
 
     public ControllerResult(T response) {
         this(new ArrayList<>(), response);
     }
 
     public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+        this(records, response, false);
+    }
+
+    public ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {

Review comment:
       This is probably just my style bias, but I wonder if it would be easier (and more readable) to use static factories for atomic vs non-atomic ControllerResult. Something like:
   
   ```java
   ControllerResult.newAtomicResult(records, response)
   // and 
   ControllerResult.newResult(records, response)
   ```
   
   Boolean flags as such a pain and easy to mess up. I actually think it might be nice if there are no public constructors for ControllerResult and we use factories for everything. However, this would be a bigger change, so I'm fine if we defer it (if we even decide we need it).

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(

Review comment:
       nit: since we're just moving this code, can we avoid reformatting it?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##########
@@ -135,18 +135,43 @@ public void testIncrementalAlterConfigs() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("123"), (short) 0)),
-            toMap(entry(BROKER0, new ApiError(
-                Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
-                entry(MYTOPIC, ApiError.NONE))),
-            manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
-                entry("foo.bar", entry(DELETE, "abc")),
-                entry("quux", entry(SET, "abc")))),
-            entry(MYTOPIC, toMap(
-                entry("abc", entry(APPEND, "123")))))));
+        assertEquals(

Review comment:
       Is any code changing in here, or is it just reformatted?

##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -371,8 +371,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {

Review comment:
       Same comment for the other LocalLogManager. Hmm 🤔 why do we have two of these?

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(r -> r.message())

Review comment:
       nit: method reference can be used here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r586814323



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(r -> r.message())

Review comment:
       Done.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
##########
@@ -28,15 +28,21 @@
 class ControllerResult<T> {
     private final List<ApiMessageAndVersion> records;
     private final T response;
+    private final boolean isAtomic;
 
     public ControllerResult(T response) {
         this(new ArrayList<>(), response);
     }
 
     public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+        this(records, response, false);
+    }
+
+    public ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {

Review comment:
       I agree with your style preference. Changed.

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##########
@@ -135,18 +135,43 @@ public void testIncrementalAlterConfigs() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("123"), (short) 0)),
-            toMap(entry(BROKER0, new ApiError(
-                Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
-                entry(MYTOPIC, ApiError.NONE))),
-            manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
-                entry("foo.bar", entry(DELETE, "abc")),
-                entry("quux", entry(SET, "abc")))),
-            entry(MYTOPIC, toMap(
-                entry("abc", entry(APPEND, "123")))))));
+        assertEquals(

Review comment:
       Yes. It should be clearer now that we have `ControllerResult::atomicOf`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r586813520



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -371,8 +371,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {

Review comment:
       Yeah. I noticed this too. Not clear if this was on purpose. I say we fix this in a future PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587664832



##########
File path: metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
##########
@@ -101,32 +101,55 @@ public void testUpdateFeaturesErrorCases() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         FeatureControlManager manager = new FeatureControlManager(
             rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
-        assertEquals(new ControllerResult<>(Collections.
-                singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Broker 5 does not support the given feature range."))),
-            manager.updateFeatures(rangeMap("foo", 1, 3),
+
+        assertEquals(
+            ControllerResult.of(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    "foo",
+                    new ApiError(
+                        Errors.INVALID_UPDATE_VERSION,
+                        "Broker 5 does not support the given feature range."
+                    )
+                )
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 3),
                 new HashSet<>(Arrays.asList("foo")),

Review comment:
       Yep. Fixed throughout the file.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##########
@@ -69,7 +69,12 @@
             results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
                 downgradeables.contains(entry.getKey()), brokerFeatures, records));
         }
-        return new ControllerResult<>(records, results);
+
+        if (records.isEmpty()) {

Review comment:
       I don't have a good reason. I felt it was consistent that an empty set of records was by definition not atomic. I removed the conditional and always create an atomic `ControllerResult`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587598524



##########
File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##########
@@ -69,7 +69,12 @@
             results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
                 downgradeables.contains(entry.getKey()), brokerFeatures, records));
         }
-        return new ControllerResult<>(records, results);
+
+        if (records.isEmpty()) {

Review comment:
       Any reason why we are calling the non-atomic method for an empty record set? Seems like we don't bother doing this in the other control manager classes. Can we just do `ControllerResult.atomicOf(records, results)` regardless of empty records?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587595181



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws Exception {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(

Review comment:
       Yea, sounds good. I actually agree it's more readable now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #10253:
URL: https://github.com/apache/kafka/pull/10253


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #10253: KAFKA-12376: Apply atomic append to the log

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r587533515



##########
File path: metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
##########
@@ -101,32 +101,55 @@ public void testUpdateFeaturesErrorCases() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         FeatureControlManager manager = new FeatureControlManager(
             rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
-        assertEquals(new ControllerResult<>(Collections.
-                singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Broker 5 does not support the given feature range."))),
-            manager.updateFeatures(rangeMap("foo", 1, 3),
+
+        assertEquals(
+            ControllerResult.of(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    "foo",
+                    new ApiError(
+                        Errors.INVALID_UPDATE_VERSION,
+                        "Broker 5 does not support the given feature range."
+                    )
+                )
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 3),
                 new HashSet<>(Arrays.asList("foo")),

Review comment:
       `Collections.singleton()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org