You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/10 21:42:57 UTC

[GitHub] [kafka] andymg3 opened a new pull request, #13104: KAFKA-14612: Make sure to write ConfigRecords to metadata log iff topic is created

andymg3 opened a new pull request, #13104:
URL: https://github.com/apache/kafka/pull/13104

   ### JIRA
   https://issues.apache.org/jira/browse/KAFKA-14612
   
   ### Details
   Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.
   
   ### Testing
   Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13104: KAFKA-14612: Make sure to write ConfigRecords to metadata log iff topic is created

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1066408319


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,16 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(0, result3.records().size());

Review Comment:
   nit: a better assertion is this:
   ```java
   assertEquals(Collections.emptyList(), result3.records());
   ```
   It gives more information in case of a failure.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -725,6 +725,8 @@ private ApiError createTopic(CreatableTopic topic,
         records.add(new ApiMessageAndVersion(new TopicRecord().
             setName(topic.name()).
             setTopicId(topicId), (short) 0));
+        // ConfigRecords go after TopicRecord but before PartitionRecord(s).
+        records.addAll(configResult.records());

Review Comment:
   This looks a little suspicious. Don't we need to pull the configs for the topic that is being created? We should have a test case with multiple topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1067582346


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());

Review Comment:
   Wonder if it would be better to assert the full ConfigRecord instead of just checking one field? 



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps,
+        boolean newlyCreatedResource) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        ApiError apiError = incrementalAlterConfigResource(configResource,
+            keyToOps,
+            newlyCreatedResource,
+            outputRecords);
+        return ControllerResult.atomicOf(outputRecords, apiError);
+    }
+
+    private ApiError incrementalAlterConfigResource(ConfigResource configResource,
                                                 Map<String, Entry<OpType, String>> keysToOps,
                                                 boolean newlyCreatedResource,
-                                                List<ApiMessageAndVersion> outputRecords,
-                                                Map<ConfigResource, ApiError> outputResults) {
+                                                List<ApiMessageAndVersion> outputRecords) {

Review Comment:
   nit: can we align the arguments?



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps,

Review Comment:
   nit: move second argument to next line. Not everyone agrees, but I also like to put the closing parenthesis on a new line.
   ```java
   ControllerResult<ApiError> incrementalAlterConfig(
       ConfigResource configResource, 
       Map<String, Entry<OpType, String>> keyToOps,
       boolean newlyCreatedResource
   ) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #13104: KAFKA-14612: Make sure to write ConfigRecords to metadata log iff topic is created

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1066412518


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -725,6 +725,8 @@ private ApiError createTopic(CreatableTopic topic,
         records.add(new ApiMessageAndVersion(new TopicRecord().
             setName(topic.name()).
             setTopicId(topicId), (short) 0));
+        // ConfigRecords go after TopicRecord but before PartitionRecord(s).
+        records.addAll(configResult.records());

Review Comment:
   You're right - let me fix that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1068539671


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());

Review Comment:
   Fair enough. It does make sure we are emitting the exact `ConfigRecord` expected. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1067622704


##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps,
+        boolean newlyCreatedResource) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        ApiError apiError = incrementalAlterConfigResource(configResource,
+            keyToOps,
+            newlyCreatedResource,
+            outputRecords);
+        return ControllerResult.atomicOf(outputRecords, apiError);
+    }
+
+    private ApiError incrementalAlterConfigResource(ConfigResource configResource,
                                                 Map<String, Entry<OpType, String>> keysToOps,
                                                 boolean newlyCreatedResource,
-                                                List<ApiMessageAndVersion> outputRecords,
-                                                Map<ConfigResource, ApiError> outputResults) {
+                                                List<ApiMessageAndVersion> outputRecords) {

Review Comment:
   Yup. Fixed.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());

Review Comment:
   I'd argue we already test that sort of deeper validation above in the form of:
   ```
           ctx.replay(result1.records());
           assertEquals(
               "notNull",
               ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo")
           );
   ```
   
   The testing I'm doing here is more to test we only emit records for topic batchedTopic1. So my thought process was no need to duplicate the more deeper validation. 
   
   Thoughts?  



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps,

Review Comment:
   Changed to that. I agree the closing parenthesis on the next line reads a bit better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1067622366


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());

Review Comment:
   I'd argue we already have that sort of deeper validation above in the form of:
   ```
           ctx.replay(result1.records());
           assertEquals(
               "notNull",
               ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo")
           );
   ```
   
   The testing I'm doing here is more to test we only emit records for topic batchedTopic1. So my thought process was no need to duplicate the more deeper validation. 
   
   Thoughts?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1068504478


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception {
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());

Review Comment:
   Hmm, it is a little different though. In once case, we're validating the result of applying the records. In the other, we're validating the records themselves. You can argue one implies the other, but maybe there's no harm being thorough? By the way, I was also thinking this might simplify the assertions and get rid of the messy-looking cast:
   
   ```java
           assertEquals(new ConfigRecord()
               .setResourceName(batchedTopic1)
               .setResourceType(ConfigResource.Type.TOPIC.id())
               .setName("foo")
               .setValue("notNull"),
               result4.records().get(1).message());
   ```
   Anyway, will leave it up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13104: KAFKA-14612: Make sure to write ConfigRecords to metadata log iff topic is created

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1066409782


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -725,6 +725,8 @@ private ApiError createTopic(CreatableTopic topic,
         records.add(new ApiMessageAndVersion(new TopicRecord().
             setName(topic.name()).
             setTopicId(topicId), (short) 0));
+        // ConfigRecords go after TopicRecord but before PartitionRecord(s).
+        records.addAll(configResult.records());

Review Comment:
   This looks a little suspicious. Don't we need to pull only the respective configs for the topic that is being created? We should have a test case with multiple topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #13104: KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #13104:
URL: https://github.com/apache/kafka/pull/13104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org