You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/06 15:00:08 UTC

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12383: REVERT: Kip-770

vamossagar12 commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r914702189


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -137,54 +129,19 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            // If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded
-            // and rely on the input.buffer.max.bytes instead to keep the memory usage under control
-            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
-                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
+            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         }
 
-        final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
-        final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
-
-        if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
-            cacheSize = getTotalCacheSize(globalAppConfigs);
+        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
         } else {
-            if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-                log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         cacheSize);
-            } else if (cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-                log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
-                             "we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         cacheSize,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            } else {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-            }
-
-            if (cacheSize != 0) {
-                log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the "
-                             + "topology-level cache size config only controls whether record buffering is enabled "
-                             + "or disabled, thus the only valid override value is 0",
-                         topologyName, cacheSize);
-            } else {
-                log.info("Topology {} is overriding cache size to {}, record buffering will be disabled",
-                         topologyName, cacheSize);
-            }
+            cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
         }
 
         if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
             maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+            log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);

Review Comment:
   nit: typo in overriding.. I think that's how it was originally. Comment can be ignored :) 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -55,7 +55,6 @@ public class RecordQueue {
 
     private final Sensor droppedRecordsSensor;
     private final Sensor consumedSensor;
-    private long totalBytesBuffered;
     private long headRecordSizeInBytes;

Review Comment:
   I think headRecordSizeInBytes was also added as part of the PR which should be removed:
   
   https://github.com/apache/kafka/pull/11796/files#diff-2c19d764cad8fcbe7da8046cf0a01e525bc41a5e12e08e8c71d76c0f27ffc550R56



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1256,44 +1255,6 @@ public void shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
         );
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 100);
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
-    }
-
-    @Test
-    public void testInvalidSecurityProtocol() {
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
-        final ConfigException ce = assertThrows(ConfigException.class,
-                () -> new StreamsConfig(props));
-        assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-    }
-

Review Comment:
   This test should be there as it seems unconnected to the revert. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org