You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/22 17:58:51 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #11796: Kip 770

vamossagar12 opened a new pull request #11796:
URL: https://github.com/apache/kafka/pull/11796


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1053825346


   @guozhangwang , now i see failures with raft/zk tests. that seems unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r837115301



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1038,14 +1041,15 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
-                final int numLiveThreads = getNumLiveStreamThreads();
-                final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
-                log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
-                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
-                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may be duplicate too.
-                streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
+                // Also, we create the new thread with initial values of cache size and max buffer size as 0
+                // and then resize them later
+                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+                final int numLiveThreads = getNumLiveStreamThreads();
+                resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
       I read the code change, and I agree with you. We don't need `+1` anymore. Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r817035430



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",

Review comment:
       `overriding {} to {}` here could also be a bit confusing since it could be interpreted as `the second override the first` by some one. I'd suggest we just say `Both deprecated config {} and new config {} are set, hence {} is ignored and the new config {} (value {}) is used.`
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",
+                        topologyName,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        cacheSize);
+            } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+                log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);

Review comment:
       Ditto here: we just say "Only deprecated config {} is set and hence it's value {} would be used; we suggest setting the new config {} instead since the deprecated config {} would be removed in the future."

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",
+                        topologyName,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        cacheSize);
+            } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+                log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+            } else {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);

Review comment:
       I think if just the new config is used (which is expected to be the norm in the future) we do not need to log anything.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1407,6 +1430,26 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         return props;
     }
 
+    public long getTotalCacheSize() {
+        // both deprecated and new config set. Warn and use the new one.
+        if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
+            log.warn("Use of deprecated config {} noticed.", CACHE_MAX_BYTES_BUFFERING_CONFIG);

Review comment:
       This warn log line would be confusing I'm afraid, since we print this line in both cases, whereas here we are not actually using the deprecated config, but the new one. I'd suggest we differentiate this line with the line 1445 below, e.g.:
   
   * Remove this log line in 1436.
   * Update log line in 1438:
   
   ```
   Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.
   ```
   
   * Update log line in 1446 as:
   
   ```
   Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 edited a comment on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
vamossagar12 edited a comment on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1049024510


   @ableegoldman , @guozhangwang  , @mjsax  this is the new PR for KIP-770(follow up from https://github.com/apache/kafka/pull/11424/
   
   last 3-4 commits hold the new set of changes. I haven't added the StreamConfigUtils class, was thinking if i can do it on a separate ticket. WDYT? Also, the javadoc is pending at this moment. Will add once we are ok with the changes here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1074547962


   Thanks @vamossagar12 . I've merged the PR, and please go ahead and mark the ticket / KIP as for 3.3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1081507993


   > Thanks for the new PR @vamossagar12 ! Sorry I wasn't able to take a look earlier but I just gave it a quick pass now. I took the liberty of moving the `#getTotalCacheSize` method to the `StreamsConfigUtils` class myself since I'm doing a quick warning log PR in that part of the code anyways.
   
   Thanks for that! I thought I will create a follow up PR for moving to StreamsConfigUtils after another issue that I have been working on. You have done it already !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r837073268



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1038,14 +1041,15 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
-                final int numLiveThreads = getNumLiveStreamThreads();
-                final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
-                log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
-                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
-                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may be duplicate too.
-                streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
+                // Also, we create the new thread with initial values of cache size and max buffer size as 0
+                // and then resize them later
+                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+                final int numLiveThreads = getNumLiveStreamThreads();
+                resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
       I think this feedback got lost in the shuffle when we reverted the original PR, but this needs to be fixed -- the `+ 1` was only necessary in the old code because we resized the cache before adding the new thread/computing the thread count. Now that we first create the new thread, the `numLiveThreads` count should accurately reflect the number of current threads, so we shouldn't be adding to it anymore.
   
   I'll include a fix for this on the side in a PR I'm doing so no worries 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -971,14 +972,16 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
 
         queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
         for (int i = 1; i <= numStreamThreads; i++) {
-            createAndAddStreamThread(cacheSizePerThread, i);
+            createAndAddStreamThread(0L, 0L, i);
         }
+        // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here.

Review comment:
       Why do we do it this way? ie rather than just computing the size upfront and creating the threads with that? I find this a bit confusing, mainly because I can't tell if there is a technical reason for doing it this way or it was just a design choice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1073657931


   > @vamossagar12 could you resolve the conflicts before I re-trigger jenkins again?
   
   @guozhangwang done. On my local, only one test failed in streams which is => org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest#shouldRestoreState


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1069361318


   @vamossagar12 could you resolve the conflicts before I re-trigger jenkins again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1049024510


   @ableegoldman , @guozhangwang  , @mjsax  this is the new PR for KIP-770(follow up from https://github.com/apache/kafka/pull/11424/
   
   last 3-4 commits hold the new set of changes. I haven't added the StreamConfigUtils class, was thinking if i can do it on a separate ticket. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r822308654



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",

Review comment:
       Yeah makes sense. I have updated it to use topology name along with the rest of log message. I guess that should be fine?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",
+                        topologyName,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        cacheSize);
+            } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+                log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);

Review comment:
       Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11796:
URL: https://github.com/apache/kafka/pull/11796


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494547


   Re-triggered jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1049465748


   @guozhangwang , this PR is showing a failure in streams:compileTestJava.. It seems to be working on my local. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1051079006


   Hello @vamossagar12 I checked out your branch, and run the `streams:compileTestJava` on my local machine it it also fails with:
   
   ```
   > Task :streams:compileTestJava
   /Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java:95: warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated
                   mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
                                        ^
   error: warnings found and -Werror specified
   1 error
   1 warning
   
   > Task :streams:compileTestJava FAILED
   ```
   
   I guess your branch was not rebased on top of `trunk` and maybe that's why you did not see the failure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: Kip 770

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1053492332


   @guozhangwang  i had done a rebase before pushing and hadn't notice the issue. Not sure what went wrong there(maybe an oversight from me). Anyways i did another rebase and now i could see the error. Have fixed it and pushed again. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494691


   cc @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r837136993



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1038,14 +1041,15 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
-                final int numLiveThreads = getNumLiveStreamThreads();
-                final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
-                log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
-                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
-                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may be duplicate too.
-                streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
+                // Also, we create the new thread with initial values of cache size and max buffer size as 0
+                // and then resize them later
+                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+                final int numLiveThreads = getNumLiveStreamThreads();
+                resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
       Oh yeah I forgot about it :( Thanks for the new PR. I will take a look..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1081455587


   FYI @vamossagar12 here is the PR: https://github.com/apache/kafka/pull/11959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1074698895


   Thanks @guozhangwang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org