You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lihaosky (via GitHub)" <gi...@apache.org> on 2023/06/12 22:22:11 UTC

[GitHub] [kafka] lihaosky opened a new pull request, #13846: [KAFKA-15022][1/N] add configs to control rack aware assignment

lihaosky opened a new pull request, #13846:
URL: https://github.com/apache/kafka/pull/13846

   ### Description
   Configs to control rack aware assignment for [KIP-925](https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams)
   
   ### Testing
   Unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1238821866


##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1375,6 +1375,18 @@ public void shouldReturnDefaultClientSupplier() {
         assertTrue(supplier instanceof DefaultKafkaClientSupplier);
     }
 
+    @Test
+    public void shouldReturnDefaultRackAwareAssignmentConfig() {
+        final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY);
+        assertEquals(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, strategy);
+    }
+

Review Comment:
   Make sense. I can use raw values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky closed pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky closed pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment
URL: https://github.com/apache/kafka/pull/13846


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

Posted by "ableegoldman (via GitHub)" <gi...@apache.org>.
ableegoldman commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1275629503


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +755,18 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY = "MIN_TRAFFIC_BALANCE_SUBTOPOLOGY";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY = "rack.aware.assignment.strategy";

Review Comment:
   ```suggestion
       public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy";
   ```
   
   We might have missed this during the KIP discussion, I don't remember, but the standard in StreamsConfig is to suffix the config variables with. `_CONFIG`. We should definitely do this here/now and update the KIP and discussion thread if necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1238377947


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +755,18 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY = "MIN_TRAFFIC_BALANCE_SUBTOPOLOGY";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY = "rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
       public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of topic partitions into account when assigning"
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1375,6 +1375,18 @@ public void shouldReturnDefaultClientSupplier() {
         assertTrue(supplier instanceof DefaultKafkaClientSupplier);
     }
 
+    @Test
+    public void shouldReturnDefaultRackAwareAssignmentConfig() {
+        final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY);
+        assertEquals(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, strategy);
+    }
+

Review Comment:
   I think you also need to test setting the three valid values. In the end they are part of the public API, and should be a test that verifies that they are accepted. 
   
   The test should be like this:
   https://github.com/apache/kafka/blob/474053d2973b8790e50ccfe1bb0699694b0de1c7/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java#L615
   
   Do not use constants `RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, or RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY` because you want to ensure that the test fails if the content of those constants changes. 
   



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -890,6 +902,12 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_STRATEGY,
+                Type.STRING,
+                RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE,
+                in(RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY),

Review Comment:
   nit: I think you could also use an enum like here: https://github.com/apache/kafka/blob/c958d8719dc2588bd27958b54a65dea514808796/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L907 
   
   But that is not needed, I just wanted to give you an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on PR #13846:
URL: https://github.com/apache/kafka/pull/13846#issuecomment-1603047119

   > Thanks @lihaosky for the PR!
   > 
   > Are you sure you want to start with adding the config?
   > 
   > I would have added the config as the last step in the implementation. If we add it now, the config will show up in the docs but it will not or only partially work until the implementation is finished.
   
   @cadonna , thanks for reviewing. I can move this to last PR. I thought people would use flags when the features is announced in some release...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org