You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/15 19:15:28 UTC

[GitHub] [kafka] vvcephei commented on a diff in pull request #12641: KAFKA-14209 : Change Topology optimization to accept list of rules 1/3

vvcephei commented on code in PR #12641:
URL: https://github.com/apache/kafka/pull/12641#discussion_r972139781


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -233,6 +235,15 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
+    /** {@code topology.optimization} */
+    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations to apply. "
+        + "Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+        + "or a comma separated list of specific optimizations: "
+        + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\"). "
+        + "Disabled by default.";

Review Comment:
   ```suggestion
           + NO_OPTIMIZATION+" by default.";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1653,6 +1678,34 @@ private Map<String, Object> getClientCustomProps() {
         return props;
     }
 
+    public static Set<String> verifyTopologyOptimizationConfigs(final String config) {
+        final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+        final Set<String> verifiedConfigs = new HashSet<>();
+        // Verify it doesn't contain none or all plus a list of optimizations
+        if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+            if (configs.size() > 1) {
+                throw new ConfigException("A topology can either not be optimized with " + NO_OPTIMIZATION + " "
+                                              + "or optimized. If you want to optimize the "
+                                              + "topology, you can choose between all "
+                                              + "optimizations with " + OPTIMIZE + " " + "or "
+                                              + "specific optimizations by specifying a comma "
+                                              + "separated list.");

Review Comment:
   I took a closer look at the text of this error, and I'm afraid it might be confusing to users. The rendered text will say "A topology can either not be optimized with none or optimized.", which seems hard to parse to me, both because the sentence structure is confusing and because the config to turn off optimizations is mentioned ("none"), but the config to turn them on is not ("all").
   
   ```suggestion
                   throw new ConfigException(
                       "\"" + config + "\" is not a valid optimization config. " +
                       "Valid optimization configs are: " +
                       "\"" + OPTIMIZE + "\" to enable all optimizations, " +
                       "\"" + NO_OPTIMIZATION + "\" to disable all optimizations, " +
                       "or a comma separated list of specific optimizations: " +
                       "(\"" + REUSE_KTABLE_SOURCE_TOPICS + "\", " +
                       MERGE_REPARTITION_TOPICS + "\"). "
                   );
   ```
   
   Realistically, I'd probably build the string for the second part of that message ("Valid ...") once and save it in a private constant for use in both the doc string as well as the validation error.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,54 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not be optimized with"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+        final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION, StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not be optimized with"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenOptimizationDoesNotExist() {
+        final String value = String.join(",",
+                                         StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         "topology.optimization.does.not.exist",
+                                         StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+        final String value = String.join(",", "topology.optimization.does.not.exist");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldAllowMultipleOptimizations() {

Review Comment:
   Can we add a test that "all" results in all the expected optimizations being listed? And that "non" results in an empty list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org