You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/06/22 11:26:51 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

cadonna commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1238377947


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +755,18 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC";
+    public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY = "MIN_TRAFFIC_BALANCE_SUBTOPOLOGY";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY = "rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
       public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of topic partitions into account when assigning"
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1375,6 +1375,18 @@ public void shouldReturnDefaultClientSupplier() {
         assertTrue(supplier instanceof DefaultKafkaClientSupplier);
     }
 
+    @Test
+    public void shouldReturnDefaultRackAwareAssignmentConfig() {
+        final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY);
+        assertEquals(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, strategy);
+    }
+

Review Comment:
   I think you also need to test setting the three valid values. In the end they are part of the public API, and should be a test that verifies that they are accepted. 
   
   The test should be like this:
   https://github.com/apache/kafka/blob/474053d2973b8790e50ccfe1bb0699694b0de1c7/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java#L615
   
   Do not use constants `RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, or RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY` because you want to ensure that the test fails if the content of those constants changes. 
   



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -890,6 +902,12 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_STRATEGY,
+                Type.STRING,
+                RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE,
+                in(RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY),

Review Comment:
   nit: I think you could also use an enum like here: https://github.com/apache/kafka/blob/c958d8719dc2588bd27958b54a65dea514808796/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L907 
   
   But that is not needed, I just wanted to give you an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org