You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/07 18:20:02 UTC

[kafka] branch trunk updated: MINOR: revert KIP-770 (#12383)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38b08dfd338 MINOR: revert KIP-770 (#12383)
38b08dfd338 is described below

commit 38b08dfd3384a04b73e236c83f1e3ce0fffeda45
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Jul 7 11:19:37 2022 -0700

    MINOR: revert KIP-770 (#12383)
    
    KIP-770 introduced a performance regression and needs some re-design.
    
    Needed to resolve some conflict while reverting.
    
    This reverts commits 1317f3f77a9e1e432e7a81de2dcb88365feeac43 and 0924fd3f9f75c446310ed1e97b44bbc3f33c6c31.
    
    Reviewers:  Sagar Rao <sa...@gmail.com>, Guozhang Wang <gu...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../examples/pageview/PageViewTypedDemo.java       |   2 +-
 .../examples/pageview/PageViewUntypedDemo.java     |   2 +-
 .../examples/temperature/TemperatureDemo.java      |   2 +-
 .../streams/examples/wordcount/WordCountDemo.java  |   2 +-
 .../examples/wordcount/WordCountProcessorDemo.java |   2 +-
 .../wordcount/WordCountTransformerDemo.java        |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  81 +++----
 .../org/apache/kafka/streams/StreamsConfig.java    |  23 --
 .../org/apache/kafka/streams/TopologyConfig.java   |  63 +-----
 .../streams/internals/StreamsConfigUtils.java      |  31 ---
 .../kafka/streams/kstream/CogroupedKStream.java    |   8 +-
 .../kafka/streams/kstream/KGroupedStream.java      |  20 +-
 .../kafka/streams/kstream/KGroupedTable.java       |  22 +-
 .../kstream/SessionWindowedCogroupedKStream.java   |   8 +-
 .../streams/kstream/SessionWindowedKStream.java    |  24 +--
 .../kstream/TimeWindowedCogroupedKStream.java      |   8 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |  24 +--
 .../processor/internals/PartitionGroup.java        |  42 +---
 .../streams/processor/internals/RecordQueue.java   |  11 -
 .../streams/processor/internals/StreamTask.java    |  19 +-
 .../streams/processor/internals/StreamThread.java  |  45 +---
 .../streams/processor/internals/TaskManager.java   |  19 --
 .../processor/internals/metrics/TaskMetrics.java   |  37 ----
 .../kafka/streams/state/internals/NamedCache.java  |  10 -
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  34 +--
 .../apache/kafka/streams/StreamsConfigTest.java    |  31 ---
 .../integration/AbstractJoinIntegrationTest.java   |   2 +-
 .../integration/AbstractResetIntegrationTest.java  |   2 +-
 .../integration/AdjustStreamThreadCountTest.java   |  40 +---
 .../integration/EmitOnChangeIntegrationTest.java   |   2 +-
 .../streams/integration/EosIntegrationTest.java    |   6 +-
 .../integration/EosV2UpgradeIntegrationTest.java   |   2 +-
 .../integration/ErrorHandlingIntegrationTest.java  | 161 --------------
 .../FineGrainedAutoResetIntegrationTest.java       |   4 +-
 .../GlobalKTableEOSIntegrationTest.java            |   2 +-
 .../integration/GlobalKTableIntegrationTest.java   |   2 +-
 .../integration/GlobalThreadShutDownOrderTest.java |   2 +-
 .../integration/InternalTopicIntegrationTest.java  |   2 +-
 .../KStreamAggregationDedupIntegrationTest.java    |   2 +-
 .../KStreamAggregationIntegrationTest.java         |   2 +-
 .../KStreamRepartitionIntegrationTest.java         |   2 +-
 ...yInnerJoinCustomPartitionerIntegrationTest.java |   1 -
 ...bleForeignKeyInnerJoinMultiIntegrationTest.java |   2 +-
 .../KTableSourceTopicRestartIntegrationTest.java   |   2 +-
 .../integration/MetricsIntegrationTest.java        |   6 +-
 .../integration/NamedTopologyIntegrationTest.java  |   2 +-
 .../OptimizedKTableIntegrationTest.java            |   2 +-
 .../integration/PauseResumeIntegrationTest.java    |   2 +-
 .../integration/QueryableStateIntegrationTest.java |   2 +-
 .../integration/RegexSourceIntegrationTest.java    |   2 +-
 .../integration/RestoreIntegrationTest.java        |   2 +-
 .../integration/RocksDBMetricsIntegrationTest.java |   2 +-
 .../SlidingWindowedKStreamIntegrationTest.java     |   2 +-
 .../integration/StandbyTaskEOSIntegrationTest.java |   2 +-
 .../integration/StoreUpgradeIntegrationTest.java   |   2 +-
 ...bleJoinTopologyOptimizationIntegrationTest.java |   2 +-
 .../TimeWindowedKStreamIntegrationTest.java        |   2 +-
 .../kstream/internals/KTableFilterTest.java        |   4 +-
 .../internals/SessionWindowedKStreamImplTest.java  |   2 +-
 .../internals/InternalTopologyBuilderTest.java     |  18 +-
 .../processor/internals/PartitionGroupTest.java    |  89 --------
 .../internals/RepartitionOptimizingTest.java       |   2 +-
 .../RepartitionWithMergeOptimizingTest.java        |   2 +-
 .../processor/internals/StandbyTaskTest.java       |   1 +
 .../processor/internals/StreamTaskTest.java        |   1 -
 .../processor/internals/StreamThreadTest.java      | 232 +--------------------
 .../internals/metrics/TaskMetricsTest.java         |  48 +----
 .../streams/state/internals/NamedCacheTest.java    |  12 +-
 .../streams/tests/BrokerCompatibilityTest.java     |   2 +-
 .../apache/kafka/streams/tests/EosTestClient.java  |   2 +-
 .../streams/tests/StreamsNamedRepartitionTest.java |   2 +-
 .../kafka/streams/tests/StreamsOptimizedTest.java  |   2 +-
 .../streams/tests/StreamsStandByReplicaTest.java   |   2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   6 +-
 75 files changed, 189 insertions(+), 1080 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 033017f0fe3..f6ca0d02fe3 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -188,7 +188,7 @@
               files="StreamsMetricsImpl.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(GlobalStateManagerImpl|KafkaStreams|KStreamImplJoin|StreamsPartitionAssignor|StreamThread|TaskManager|TopologyConfig).java"/>
+              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin).java"/>
 
     <suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
               files="Murmur3.java"/>
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index be54bafca2b..a5086de8c66 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -179,7 +179,7 @@ public class PageViewTypedDemo {
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 8fc874488ab..cdb36394a98 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -60,7 +60,7 @@ public class PageViewUntypedDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 6e40fa03066..3dc8eda25f1 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -77,7 +77,7 @@ public class TemperatureDemo {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 
         final Duration duration24Hours = Duration.ofHours(24);
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index d290c660bbf..4ca5d73f1d8 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -61,7 +61,7 @@ public final class WordCountDemo {
         }
         props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 6204c422bc0..014923fb64c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -108,7 +108,7 @@ public final class WordCountProcessorDemo {
 
         props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
         props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
index 90f4764be7f..617e491e111 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
@@ -131,7 +131,7 @@ public final class WordCountTransformerDemo {
         }
         props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
         props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2c95aa85a45..a923c8e983f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -109,7 +109,6 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
 import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
 
@@ -176,7 +175,6 @@ public class KafkaStreams implements AutoCloseable {
     protected final Admin adminClient;
     private final StreamsMetricsImpl streamsMetrics;
     private final long totalCacheSize;
-    private final long inputBufferMaxBytes;
     private final StreamStateListener streamStateListener;
     private final StateRestoreListener delegatingStateRestoreListener;
     private final Map<Long, StreamThread.State> threadState;
@@ -941,9 +939,9 @@ public class KafkaStreams implements AutoCloseable {
         streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
         delegatingStateRestoreListener = new DelegatingStateRestoreListener();
 
-        totalCacheSize = getTotalCacheSize(applicationConfigs);
-        inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG);
+        totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
         final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
+        final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
 
         GlobalStreamThread.State globalThreadState = null;
         if (hasGlobalTopology) {
@@ -953,7 +951,7 @@ public class KafkaStreams implements AutoCloseable {
                 applicationConfigs,
                 clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
                 stateDirectory,
-                0L,
+                cacheSizePerThread,
                 streamsMetrics,
                 time,
                 globalThreadId,
@@ -974,20 +972,14 @@ public class KafkaStreams implements AutoCloseable {
 
         queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
         for (int i = 1; i <= numStreamThreads; i++) {
-            createAndAddStreamThread(0L, 0L, i);
-        }
-        // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here.
-        resizeThreadCacheAndBufferMemory(numStreamThreads);
-        if (numStreamThreads > 0) {
-            log.info("Initializing {} StreamThread with cache size/max buffer size values as {} per thread.",
-                numStreamThreads, getThreadCacheAndBufferMemoryString());
+            createAndAddStreamThread(cacheSizePerThread, i);
         }
 
         stateDirCleaner = setupStateDirCleaner();
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
     }
 
-    private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final long maxBufferSizePerThread, final int threadIdx) {
+    private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
         final StreamThread streamThread = StreamThread.create(
             topologyMetadata,
             applicationConfigs,
@@ -999,7 +991,6 @@ public class KafkaStreams implements AutoCloseable {
             time,
             streamsMetadataState,
             cacheSizePerThread,
-            maxBufferSizePerThread,
             stateDirectory,
             delegatingStateRestoreListener,
             threadIdx,
@@ -1036,7 +1027,7 @@ public class KafkaStreams implements AutoCloseable {
      * Since the number of stream threads increases, the sizes of the caches in the new stream thread
      * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
      * threads does not exceed the total cache size specified in configuration
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
      * <p>
      * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
      *
@@ -1047,15 +1038,14 @@ public class KafkaStreams implements AutoCloseable {
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
+                final int numLiveThreads = getNumLiveStreamThreads();
+                final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
+                log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
+                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
+                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may be duplicate too.
-                // Also, we create the new thread with initial values of cache size and max buffer size as 0
-                // and then resize them later
-                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
-                final int numLiveThreads = getNumLiveStreamThreads();
-                resizeThreadCacheAndBufferMemory(numLiveThreads);
-                log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.",
-                        threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString());
+                streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
             }
 
             synchronized (stateLock) {
@@ -1066,9 +1056,9 @@ public class KafkaStreams implements AutoCloseable {
                     log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
                     streamThread.shutdown();
                     threads.remove(streamThread);
-                    resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
-                    log.info("Resizing thread cache and max buffer size per thread since new thread can not be " +
-                                    "started, cache size/max buffer size per thread is {}", getThreadCacheAndBufferMemoryString());
+                    final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
+                    log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
+                    resizeThreadCache(cacheSizePerThread);
                     return Optional.empty();
                 }
             }
@@ -1086,7 +1076,7 @@ public class KafkaStreams implements AutoCloseable {
      * <p>
      * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
      * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
-     * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
      *
      * @return name of the removed stream thread or empty if a stream thread could not be removed because
      *         no stream threads are alive
@@ -1101,10 +1091,9 @@ public class KafkaStreams implements AutoCloseable {
      * The removed stream thread is gracefully shut down. This method does not specify which stream
      * thread is shut down.
      * <p>
-     * Since the number of stream threads decreases, the sizes of the caches and buffer bytes in the remaining stream
-     * threads are adapted so that the sum of the cache sizes and buffer bytes over all stream threads equals the total
-     * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG} and
-     * {@link StreamsConfig#INPUT_BUFFER_MAX_BYTES_CONFIG} respectively.
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
      *
      * @param timeout The length of time to wait for the thread to shutdown
      * @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time
@@ -1144,15 +1133,16 @@ public class KafkaStreams implements AutoCloseable {
                             }
                         } else {
                             log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait "
-                                    + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
+                                + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
                         }
-                        resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
-                        log.info("Resizing thread cache/max buffer size due to removal of thread {}, " +
-                            "new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString());
+
+                        final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
+                        log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
+                        resizeThreadCache(cacheSizePerThread);
                         if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
                             final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
                             final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
-                            final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
+                            final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = 
                                 adminClient.removeMembersFromConsumerGroup(
                                     applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
                                     new RemoveMembersFromConsumerGroupOptions(membersToRemove)
@@ -1251,22 +1241,15 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    private String getThreadCacheAndBufferMemoryString() {
-        final StreamThread streamThread = threads.get(0);
-        return streamThread.getCacheSize() + "/" + streamThread.getMaxBufferSize();
-    }
-
-    private void resizeThreadCacheAndBufferMemory(final int numStreamThreads) {
-        final long cacheSizePerThread;
-        final long inputBufferMaxBytesPerThread;
+    private long getCacheSizePerThread(final int numStreamThreads) {
         if (numStreamThreads == 0) {
-            cacheSizePerThread = totalCacheSize;
-            inputBufferMaxBytesPerThread = inputBufferMaxBytes;
-        } else {
-            cacheSizePerThread = totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
-            inputBufferMaxBytesPerThread = inputBufferMaxBytes / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+            return totalCacheSize;
         }
-        processStreamThread(thread -> thread.resizeCacheAndBufferMemory(cacheSizePerThread, inputBufferMaxBytesPerThread));
+        return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final long cacheSizePerThread) {
+        processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
         if (globalStreamThread != null) {
             globalStreamThread.resize(cacheSizePerThread);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bc52c9f9b18..9975bd4680d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -419,30 +419,18 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code buffered.records.per.partition} */
     @SuppressWarnings("WeakerAccess")
-    @Deprecated
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
     public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
 
-    /** {@code input.buffer.max.bytes} */
-    @SuppressWarnings("WeakerAccess")
-    public static final String INPUT_BUFFER_MAX_BYTES_CONFIG = "input.buffer.max.bytes";
-    public static final String INPUT_BUFFER_MAX_BYTES_DOC = "Maximum bytes of records to buffer across all threads";
-
     /** {@code built.in.metrics.version} */
     public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
     private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
 
     /** {@code cache.max.bytes.buffering} */
     @SuppressWarnings("WeakerAccess")
-    @Deprecated
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
     public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
 
-    /** {@statestore.cache.max.bytes} */
-    @SuppressWarnings("WeakerAccess")
-    public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
-    public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
-
     /** {@code client.id} */
     @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -755,12 +743,6 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.MEDIUM,
                     CACHE_MAX_BYTES_BUFFERING_DOC)
-            .define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                    Type.LONG,
-                    10 * 1024 * 1024L,
-                    atLeast(0),
-                    Importance.MEDIUM,
-                    STATESTORE_CACHE_MAX_BYTES_DOC)
             .define(CLIENT_ID_CONFIG,
                     Type.STRING,
                     "",
@@ -862,11 +844,6 @@ public class StreamsConfig extends AbstractConfig {
                     in(NO_OPTIMIZATION, OPTIMIZE),
                     Importance.MEDIUM,
                     TOPOLOGY_OPTIMIZATION_DOC)
-            .define(INPUT_BUFFER_MAX_BYTES_CONFIG,
-                    Type.LONG,
-                    512 * 1024 * 1024,
-                    Importance.MEDIUM,
-                    INPUT_BUFFER_MAX_BYTES_DOC)
 
             // LOW
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index da9a8c89bfa..c4bc85656eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -32,10 +32,9 @@ import java.util.Properties;
 import java.util.function.Supplier;
 
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
-import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
 import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
@@ -49,7 +48,6 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
 import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
 import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 
 /**
  * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
@@ -57,21 +55,15 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCach
  * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
  * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
  */
-@SuppressWarnings("deprecation")
 public class TopologyConfig extends AbstractConfig {
     private static final ConfigDef CONFIG;
     static {
         CONFIG = new ConfigDef()
-         .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                 Type.INT,
-                 null,
-                 Importance.LOW,
-                 BUFFERED_RECORDS_PER_PARTITION_DOC)
-        .define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                Type.LONG,
+            .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                Type.INT,
                 null,
-                Importance.MEDIUM,
-                CACHE_MAX_BYTES_BUFFERING_DOC)
+                Importance.LOW,
+                BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
                 Type.LONG,
                 null,
@@ -137,49 +129,14 @@ public class TopologyConfig extends AbstractConfig {
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            // If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded
-            // and rely on the input.buffer.max.bytes instead to keep the memory usage under control
-            maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
-                    ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
+            maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         }
 
-        final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
-        final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
-
-        if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
-            cacheSize = getTotalCacheSize(globalAppConfigs);
+        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
         } else {
-            if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-                log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         cacheSize);
-            } else if (cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-                log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
-                             "we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         cacheSize,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            } else {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-            }
-
-            if (cacheSize != 0) {
-                log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the "
-                             + "topology-level cache size config only controls whether record buffering is enabled "
-                             + "or disabled, thus the only valid override value is 0",
-                         topologyName, cacheSize);
-            } else {
-                log.info("Topology {} is overriding cache size to {}, record buffering will be disabled",
-                         topologyName, cacheSize);
-            }
+            cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
         }
 
         if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
index 6f169826f06..e271a42ab89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
@@ -17,16 +17,9 @@
 package org.apache.kafka.streams.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
-import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
 
 public class StreamsConfigUtils {
 
-    private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class);
-
     public enum ProcessingMode {
         AT_LEAST_ONCE("AT_LEAST_ONCE"),
 
@@ -73,28 +66,4 @@ public class StreamsConfigUtils {
         return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
             processingMode == ProcessingMode.EXACTLY_ONCE_V2;
     }
-
-    @SuppressWarnings("deprecation")
-    public static long getTotalCacheSize(final StreamsConfig config) {
-        // both deprecated and new config set. Warn and use the new one.
-        if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
-            if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
-                LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG);
-            }
-            return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-        } else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
-            // only deprecated config set.
-            LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
-                     CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                     STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                     CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-        }
-        // only new or no config set. Use default or user specified value.
-        return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index b0f1deca1cb..051396fbc94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -78,7 +78,7 @@ public interface CogroupedKStream<K, VOut> {
      * same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -128,7 +128,7 @@ public interface CogroupedKStream<K, VOut> {
      * same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -179,7 +179,7 @@ public interface CogroupedKStream<K, VOut> {
      * same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -232,7 +232,7 @@ public interface CogroupedKStream<K, VOut> {
      * same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 513d94dae65..072558cf6ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -53,7 +53,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -81,7 +81,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -112,7 +112,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -158,7 +158,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -211,7 +211,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      *
      * <p>
@@ -262,7 +262,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -326,7 +326,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -385,7 +385,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      *
      * <p>
@@ -431,7 +431,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -490,7 +490,7 @@ public interface KGroupedStream<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 5733aef319c..06d12e1d4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -52,7 +52,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -95,7 +95,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -138,7 +138,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -167,7 +167,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -223,7 +223,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -296,7 +296,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -368,7 +368,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -434,7 +434,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -518,7 +518,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -604,7 +604,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
@@ -674,7 +674,7 @@ public interface KGroupedTable<K, V> {
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index eeeb3e1a036..b7e3b07e371 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -77,7 +77,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -122,7 +122,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -166,7 +166,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -226,7 +226,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index fe897515a9d..a9eaddbee48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -65,7 +65,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same session and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -95,7 +95,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same session and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -126,7 +126,7 @@ public interface SessionWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -172,7 +172,7 @@ public interface SessionWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -233,7 +233,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -282,7 +282,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -330,7 +330,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -391,7 +391,7 @@ public interface SessionWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -459,7 +459,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -504,7 +504,7 @@ public interface SessionWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -549,7 +549,7 @@ public interface SessionWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
@@ -609,7 +609,7 @@ public interface SessionWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index a46da057a08..e4178bc9e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -75,7 +75,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -115,7 +115,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -156,7 +156,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -213,7 +213,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 3f36838f202..122f73f7a70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -65,7 +65,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -95,7 +95,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -126,7 +126,7 @@ public interface TimeWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -175,7 +175,7 @@ public interface TimeWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -236,7 +236,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -281,7 +281,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -326,7 +326,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -387,7 +387,7 @@ public interface TimeWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -457,7 +457,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -502,7 +502,7 @@ public interface TimeWindowedKStream<K, V> {
      * the same window and key.
      * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -547,7 +547,7 @@ public interface TimeWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -610,7 +610,7 @@ public interface TimeWindowedKStream<K, V> {
      * to the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
      * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
-     * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
+     * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
      * <p>
      * To query the local {@link ReadOnlyWindowStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 750699a1ec1..f6b5d800fdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -58,18 +58,16 @@ import java.util.function.Function;
  */
 public class PartitionGroup {
 
-    private  final Logger logger;
+    private final Logger logger;
     private final Map<TopicPartition, RecordQueue> partitionQueues;
     private final Function<TopicPartition, OptionalLong> lagProvider;
     private final Sensor enforcedProcessingSensor;
     private final long maxTaskIdleMs;
     private final Sensor recordLatenessSensor;
-    private final Sensor totalInputBufferBytesSensor;
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
 
     private long streamTime;
     private int totalBuffered;
-    private long totalBytesBuffered;
     private boolean allBuffered;
     private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();
 
@@ -93,7 +91,6 @@ public class PartitionGroup {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
-                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
                    final long maxTaskIdleMs) {
         this.logger = logContext.logger(PartitionGroup.class);
@@ -103,7 +100,6 @@ public class PartitionGroup {
         this.enforcedProcessingSensor = enforcedProcessingSensor;
         this.maxTaskIdleMs = maxTaskIdleMs;
         this.recordLatenessSensor = recordLatenessSensor;
-        this.totalInputBufferBytesSensor = totalInputBufferBytesSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
@@ -229,8 +225,6 @@ public class PartitionGroup {
             if (!newInputPartitions.contains(topicPartition)) {
                 // if partition is removed should delete its queue
                 totalBuffered -= queueEntry.getValue().size();
-                totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered();
-                totalInputBufferBytesSensor.record(totalBytesBuffered);
                 queuesIterator.remove();
                 removedPartitions.add(topicPartition);
             }
@@ -266,17 +260,12 @@ public class PartitionGroup {
         info.queue = queue;
 
         if (queue != null) {
-            // get the buffer size of queue before poll
-            final long oldBufferSize = queue.getTotalBytesBuffered();
             // get the first record from this queue.
             record = queue.poll(wallClockTime);
-            // After polling, the buffer size would have reduced.
-            final long newBufferSize = queue.getTotalBytesBuffered();
 
             if (record != null) {
                 --totalBuffered;
-                totalBytesBuffered -= oldBufferSize - newBufferSize;
-                totalInputBufferBytesSensor.record(totalBytesBuffered);
+
                 if (queue.isEmpty()) {
                     // if a certain queue has been drained, reset the flag
                     allBuffered = false;
@@ -300,8 +289,8 @@ public class PartitionGroup {
     /**
      * Adds raw records to this partition group
      *
-     * @param partition the partition
-     * @param rawRecords  the raw records
+     * @param partition  the partition
+     * @param rawRecords the raw records
      * @return the queue size for the partition
      */
     int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
@@ -312,9 +301,7 @@ public class PartitionGroup {
         }
 
         final int oldSize = recordQueue.size();
-        final long oldBufferSize = recordQueue.getTotalBytesBuffered();
         final int newSize = recordQueue.addRawRecords(rawRecords);
-        final long newBufferSize = recordQueue.getTotalBytesBuffered();
 
         // add this record queue to be considered for processing in the future if it was empty before
         if (oldSize == 0 && newSize > 0) {
@@ -329,8 +316,7 @@ public class PartitionGroup {
         }
 
         totalBuffered += newSize - oldSize;
-        totalBytesBuffered += newBufferSize - oldBufferSize;
-        totalInputBufferBytesSensor.record(totalBytesBuffered);
+
         return newSize;
     }
 
@@ -368,20 +354,12 @@ public class PartitionGroup {
         return recordQueue.size();
     }
 
-    Set<TopicPartition> getNonEmptyTopicPartitions() {
-        final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
-        for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
-            nonEmptyTopicPartitions.add(recordQueue.partition());
-        }
-        return nonEmptyTopicPartitions;
-    }
-
     int numBuffered() {
         return totalBuffered;
     }
 
-    long totalBytesBuffered() {
-        return totalBytesBuffered;
+    boolean allPartitionsBufferedLocally() {
+        return allBuffered;
     }
 
     void clear() {
@@ -398,10 +376,4 @@ public class PartitionGroup {
             queue.close();
         }
     }
-
-    // Below methods are for only testing.
-
-    boolean allPartitionsBufferedLocally() {
-        return allBuffered;
-    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 8aa58414a9d..297c2876738 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -55,7 +55,6 @@ public class RecordQueue {
 
     private final Sensor droppedRecordsSensor;
     private final Sensor consumedSensor;
-    private long totalBytesBuffered;
     private long headRecordSizeInBytes;
 
     RecordQueue(final TopicPartition partition,
@@ -90,7 +89,6 @@ public class RecordQueue {
             droppedRecordsSensor
         );
         this.log = logContext.logger(RecordQueue.class);
-        this.totalBytesBuffered = 0L;
         this.headRecordSizeInBytes = 0L;
     }
 
@@ -125,7 +123,6 @@ public class RecordQueue {
     int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
         for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
             fifoQueue.addLast(rawRecord);
-            this.totalBytesBuffered += consumerRecordSizeInBytes(rawRecord);
         }
 
         updateHead();
@@ -143,7 +140,6 @@ public class RecordQueue {
 
         consumedSensor.record(headRecordSizeInBytes, wallClockTime);
 
-        totalBytesBuffered -= headRecordSizeInBytes;
         headRecord = null;
         headRecordSizeInBytes = 0L;
         partitionTime = Math.max(partitionTime, recordToReturn.timestamp);
@@ -251,11 +247,4 @@ public class RecordQueue {
     long partitionTime() {
         return partitionTime;
     }
-
-    /**
-     * @return the total bytes buffered for this particular RecordQueue
-     */
-    long getTotalBytesBuffered() {
-        return totalBytesBuffered;
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 33751e59d88..5b62e3f579c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -176,14 +176,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
         recordInfo = new PartitionGroup.RecordInfo();
 
+        final Sensor enforcedProcessingSensor;
+        enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
         final long maxTaskIdleMs = config.maxTaskIdleMs;
         partitionGroup = new PartitionGroup(
             logContext,
             createPartitionQueues(),
             mainConsumer::currentLag,
             TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
-            TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, streamsMetrics),
-            TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics),
+            enforcedProcessingSensor,
             maxTaskIdleMs
         );
 
@@ -729,8 +730,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
             // after processing this record, if its partition queue's buffered size has been
             // decreased to the threshold, we can then resume the consumption on this partition
-            // TODO maxBufferedSize != -1 would be removed once the deprecated config buffered.records.per.partition is removed
-            if (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) {
+            if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
 
@@ -990,8 +990,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
         // if after adding these records, its partition queue's buffered size has been
         // increased beyond the threshold, we can then pause the consumption for this partition
-        // We do this only if the deprecated config buffered.records.per.partition is set
-        if (maxBufferedSize != -1 && newQueueSize > maxBufferedSize) {
+        if (newQueueSize > maxBufferedSize) {
             mainConsumer.pause(singleton(partition));
         }
     }
@@ -1244,14 +1243,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return recordCollector;
     }
 
-    Set<TopicPartition> getNonEmptyTopicPartitions() {
-        return this.partitionGroup.getNonEmptyTopicPartitions();
-    }
-
-    long totalBytesBuffered() {
-        return partitionGroup.totalBytesBuffered();
-    }
-
     // below are visible for testing only
     int numBuffered() {
         return partitionGroup.numBuffered();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2e574fd9f6b..8b6820fc220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -318,7 +318,6 @@ public class StreamThread extends Thread {
     // These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
-    private final AtomicLong maxBufferSizeBytes = new AtomicLong(-1L);
     private final boolean eosEnabled;
 
     public static StreamThread create(final TopologyMetadata topologyMetadata,
@@ -331,7 +330,6 @@ public class StreamThread extends Thread {
                                       final Time time,
                                       final StreamsMetadataState streamsMetadataState,
                                       final long cacheSizeBytes,
-                                      final long maxBufferSizeBytes,
                                       final StateDirectory stateDirectory,
                                       final StateRestoreListener userStateRestoreListener,
                                       final int threadIdx,
@@ -433,8 +431,7 @@ public class StreamThread extends Thread {
             referenceContainer.nonFatalExceptionsToHandle,
             shutdownErrorHook,
             streamsUncaughtExceptionHandler,
-            cache::resize,
-            maxBufferSizeBytes
+            cache::resize
         );
 
         return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@@ -457,8 +454,7 @@ public class StreamThread extends Thread {
                         final Queue<StreamsException> nonFatalExceptionsToHandle,
                         final Runnable shutdownErrorHook,
                         final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
-                        final java.util.function.Consumer<Long> cacheResizer,
-                        final long maxBufferSizeBytes) {
+                        final java.util.function.Consumer<Long> cacheResizer) {
         super(threadId);
         this.stateLock = new Object();
         this.adminClient = adminClient;
@@ -527,7 +523,6 @@ public class StreamThread extends Thread {
 
         this.numIterations = 1;
         this.eosEnabled = eosEnabled(config);
-        this.maxBufferSizeBytes.set(maxBufferSizeBytes);
     }
 
     private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -710,17 +705,8 @@ public class StreamThread extends Thread {
         }
     }
 
-    public void resizeCacheAndBufferMemory(final long cacheSize, final long maxBufferSize) {
-        cacheResizeSize.set(cacheSize);
-        maxBufferSizeBytes.set(maxBufferSize);
-    }
-
-    public long getCacheSize() {
-        return cacheResizeSize.get();
-    }
-
-    public long getMaxBufferSize() {
-        return maxBufferSizeBytes.get();
+    public void resizeCache(final long size) {
+        cacheResizeSize.set(size);
     }
 
     /**
@@ -795,13 +781,6 @@ public class StreamThread extends Thread {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
-                    final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        final Set<TopicPartition> pausedPartitions = mainConsumer.paused();
-                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
-                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
-                        mainConsumer.resume(pausedPartitions);
-                    }
                 }
 
                 log.debug("Processed {} records with {} iterations; invoking punctuators if necessary",
@@ -920,8 +899,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    // Visible for testing
-    long pollPhase() {
+    private long pollPhase() {
         final ConsumerRecords<byte[], byte[]> records;
         log.debug("Invoking poll on main Consumer");
 
@@ -967,19 +945,6 @@ public class StreamThread extends Thread {
         if (!records.isEmpty()) {
             pollRecordsSensor.record(numRecords, now);
             taskManager.addRecordsToTasks(records);
-            // Check buffer size after adding records to tasks
-            final long bufferSize = taskManager.getInputBufferSizeInBytes();
-            // Pausing partitions as the buffer size now exceeds max buffer size
-            if (bufferSize > maxBufferSizeBytes.get()) {
-                final Set<TopicPartition> nonEmptyPartitions = taskManager.nonEmptyPartitions();
-                log.info("Buffered records size {} bytes exceeds {}. Pausing partitions {} from the consumer",
-                    bufferSize, maxBufferSizeBytes.get(), nonEmptyPartitions);
-                // Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with
-                // some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing
-                // and even lead to temporal deadlock. More explanation can be found here:
-                // https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
-                mainConsumer.pause(nonEmptyPartitions);
-            }
         }
 
         while (!nonFatalExceptionsToHandle.isEmpty()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2d130859623..d5a0ee3d03c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1071,17 +1071,6 @@ public class TaskManager {
         }
     }
 
-    /**
-     *  Fetch all non-empty partitions for pausing
-     */
-    Set<TopicPartition> nonEmptyPartitions() {
-        final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
-        for (final Task task : activeTaskIterable()) {
-            nonEmptyPartitions.addAll(((StreamTask) task).getNonEmptyTopicPartitions());
-        }
-        return nonEmptyPartitions;
-    }
-
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -1185,14 +1174,6 @@ public class TaskManager {
         }
     }
 
-    long getInputBufferSizeInBytes() {
-        long bytesBuffered = 0L;
-        for (final Task task : activeTaskIterable()) {
-            bytesBuffered += ((StreamTask) task).totalBytesBuffered();
-        }
-        return bytesBuffered;
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      * @throws StreamsException      if any task threw an exception while processing
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
index 8949390e7f0..cfa1ac6333a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
@@ -84,11 +84,6 @@ public class TaskMetrics {
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
-    private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
-    private static final String INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION = "The total number of bytes accumulated in this task's input buffer";
-    private static final String CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
-    private static final String CACHE_SIZE_BYTES_TOTAL_DESCRIPTION = "The total size in bytes of this task's cache.";
-
     public static Sensor processLatencySensor(final String threadId,
                                               final String taskId,
                                               final StreamsMetricsImpl streamsMetrics) {
@@ -133,38 +128,6 @@ public class TaskMetrics {
         return sensor;
     }
 
-    public static Sensor totalInputBufferBytesSensor(final String threadId,
-                                                     final String taskId,
-                                                     final StreamsMetricsImpl streamsMetrics) {
-        final String name = INPUT_BUFFER_BYTES_TOTAL;
-        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, RecordingLevel.INFO);
-
-        addValueMetricToSensor(
-            sensor,
-            TASK_LEVEL_GROUP,
-            streamsMetrics.taskLevelTagMap(threadId, taskId),
-            name,
-            INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION
-        );
-        return sensor;
-    }
-
-    public static Sensor totalCacheSizeBytesSensor(final String threadId,
-                                                   final String taskId,
-                                                   final StreamsMetricsImpl streamsMetrics) {
-        final String name = CACHE_SIZE_BYTES_TOTAL;
-        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, Sensor.RecordingLevel.INFO);
-
-        addValueMetricToSensor(
-                sensor,
-                TASK_LEVEL_GROUP,
-                streamsMetrics.taskLevelTagMap(threadId, taskId),
-                name,
-                CACHE_SIZE_BYTES_TOTAL_DESCRIPTION
-        );
-        return sensor;
-    }
-
     public static Sensor punctuateSensor(final String threadId,
                                          final String taskId,
                                          final StreamsMetricsImpl streamsMetrics) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 71fdd4f356c..ecf063b7834 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.metrics.NamedCacheMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +48,6 @@ class NamedCache {
 
     private final StreamsMetricsImpl streamsMetrics;
     private final Sensor hitRatioSensor;
-    private final Sensor totalCacheSizeSensor;
 
     // internal stats
     private long numReadHits = 0;
@@ -68,11 +66,6 @@ class NamedCache {
             taskName,
             storeName
         );
-        totalCacheSizeSensor = TaskMetrics.totalCacheSizeBytesSensor(
-            Thread.currentThread().getName(),
-            taskName,
-            streamsMetrics
-        );
     }
 
     synchronized final String name() {
@@ -189,7 +182,6 @@ class NamedCache {
             dirtyKeys.add(key);
         }
         currentSizeBytes += node.size();
-        totalCacheSizeSensor.record(currentSizeBytes);
     }
 
     synchronized long sizeInBytes() {
@@ -251,7 +243,6 @@ class NamedCache {
         if (eldest.entry.isDirty()) {
             flush(eldest);
         }
-        totalCacheSizeSensor.record(currentSizeBytes);
     }
 
     synchronized LRUCacheEntry putIfAbsent(final Bytes key, final LRUCacheEntry value) {
@@ -278,7 +269,6 @@ class NamedCache {
         remove(node);
         dirtyKeys.remove(key);
         currentSizeBytes -= node.size();
-        totalCacheSizeSensor.record(currentSizeBytes);
         return node.entry();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b0f0647731c..b788e54923a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -150,8 +150,6 @@ public class KafkaStreamsTest {
     private GlobalStreamThread globalStreamThread;
     @Mock
     private Metrics metrics;
-    @Mock
-    private State state;
 
     private StateListenerStub streamsStateListener;
     private Capture<List<MetricsReporter>> metricsReportersCapture;
@@ -225,7 +223,6 @@ public class KafkaStreamsTest {
         ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), anyObject());
         ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class), anyObject());
 
-
         // setup stream threads
         PowerMock.mockStatic(StreamThread.class);
         EasyMock.expect(StreamThread.create(
@@ -239,7 +236,6 @@ public class KafkaStreamsTest {
             anyObject(Time.class),
             anyObject(StreamsMetadataState.class),
             anyLong(),
-            anyLong(),
             anyObject(StateDirectory.class),
             anyObject(StateRestoreListener.class),
             anyInt(),
@@ -250,13 +246,8 @@ public class KafkaStreamsTest {
         PowerMock.mockStatic(StreamsConfigUtils.class);
         EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes();
         EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
-        EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes();
         EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
         EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
-        EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
-        EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
-        EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
-        EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
         prepareStreamThread(streamThreadOne, 1, true);
         prepareStreamThread(streamThreadTwo, 2, false);
 
@@ -305,8 +296,6 @@ public class KafkaStreamsTest {
         EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get() == GlobalStreamThread.State.RUNNING).anyTimes();
         globalStreamThread.join();
         EasyMock.expectLastCall().anyTimes();
-        globalStreamThread.resize(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
 
         PowerMock.replay(
             StreamThread.class,
@@ -363,7 +352,7 @@ public class KafkaStreamsTest {
         ).anyTimes();
         EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
         EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCacheAndBufferMemory(EasyMock.anyLong(), EasyMock.anyLong());
+        thread.resizeCache(EasyMock.anyLong());
         EasyMock.expectLastCall().anyTimes();
         thread.requestLeaveGroupDuringShutdown();
         EasyMock.expectLastCall().anyTimes();
@@ -648,27 +637,6 @@ public class KafkaStreamsTest {
             streams.start();
             final int oldSize = streams.threads.size();
             waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
-            EasyMock.reset(streamThreadOne, streamThreadTwo);
-            EasyMock.expect(streamThreadOne.isRunning()).andStubReturn(true);
-            EasyMock.expect(streamThreadTwo.isRunning()).andStubReturn(true);
-            EasyMock.expect(streamThreadOne.state()).andStubReturn(StreamThread.State.RUNNING);
-            EasyMock.expect(streamThreadTwo.state()).andStubReturn(StreamThread.State.RUNNING);
-            EasyMock.expect(streamThreadOne.getName()).andStubReturn("processId-StreamThread-1");
-            EasyMock.expect(streamThreadTwo.getName()).andStubReturn("processId-StreamThread-2");
-            EasyMock.expect(streamThreadTwo.getId()).andStubReturn(2L);
-            EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
-            EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
-            EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
-            EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
-            streamThreadTwo.setStateListener(EasyMock.anyObject());
-            streamThreadTwo.start();
-
-            streamThreadOne.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
-            streamThreadTwo.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
-            streamThreadOne.shutdown();
-            streamThreadTwo.shutdown();
-            EasyMock.expect(state.isRunningOrRebalancing()).andStubReturn(true);
-            EasyMock.replay(streamThreadOne, streamThreadTwo, state);
             assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
             assertThat(streams.threads.size(), equalTo(oldSize + 1));
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index aaf68587072..435dd249f2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -65,7 +65,6 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -1256,36 +1255,6 @@ public class StreamsConfigTest {
         );
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 100);
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
-    }
-
     @Test
     public void testInvalidSecurityProtocol() {
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 5f717e336f6..d41cec04406 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -123,7 +123,7 @@ public abstract class AbstractJoinIntegrationTest {
 
     void prepareEnvironment() throws InterruptedException {
         if (!cacheEnabled) {
-            STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+            STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         }
 
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 83b843a6a50..2cead2f6929 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -151,7 +151,7 @@ public abstract class AbstractResetIntegrationTest {
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index e70f0a9861b..c8c30f6837b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -379,7 +379,7 @@ public class AdjustStreamThreadCountTest {
         final Properties props = new Properties();
         props.putAll(properties);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
 
         try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
             addStreamStateChangeListener(kafkaStreams);
@@ -390,32 +390,7 @@ public class AdjustStreamThreadCountTest {
 
                 for (final String log : appender.getMessages()) {
                     // all 10 bytes should be available for remaining thread
-                    if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10/536870912")) {
-                        return;
-                    }
-                }
-            }
-        }
-        fail();
-    }
-
-    @Test
-    public void shouldResizeMaxBufferAfterThreadRemovalTimesOut() throws InterruptedException {
-        final long maxBufferBytes = 10L;
-        final Properties props = new Properties();
-        props.putAll(properties);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes);
-
-        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
-            addStreamStateChangeListener(kafkaStreams);
-            startStreamsAndWaitForRunning(kafkaStreams);
-
-            try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) {
-                assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ofSeconds(0)));
-                for (final String log : appender.getMessages()) {
-                    // all 10 bytes should be available for remaining thread
-                    if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10485760/10")) {
+                    if (log.endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) {
                         return;
                     }
                 }
@@ -426,14 +401,12 @@ public class AdjustStreamThreadCountTest {
 
     @Test
     @SuppressWarnings("deprecation")
-    public void shouldResizeCacheAndInputBufferAfterThreadReplacement() throws InterruptedException {
+    public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
         final long totalCacheBytes = 10L;
-        final long maxBufferBytes = 100L;
         final Properties props = new Properties();
         props.putAll(properties);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
-        props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
 
         final AtomicBoolean injectError = new AtomicBoolean(false);
 
@@ -473,9 +446,8 @@ public class AdjustStreamThreadCountTest {
                 waitForTransitionFromRebalancingToRunning();
 
                 for (final String log : appender.getMessages()) {
-                    // after we replace the thread there should be two remaining threads with 5 bytes each for
-                    // the cache and 50 for the input buffer
-                    if (log.endsWith("Adding StreamThread-3, there are now 2 threads with cache size/max buffer size values as 5/50 per thread.")) {
+                    // after we replace the thread there should be two remaining threads with 5 bytes each
+                    if (log.endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) {
                         return;
                     }
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 3d2bcb8055d..27958730cfe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -97,7 +97,7 @@ public class EmitOnChangeIntegrationTest {
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
                 mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
                 mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
-                mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
                 mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
                 mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
                 mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 7fd07b01ff6..12cb0bf9563 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -249,7 +249,7 @@ public class EosIntegrationTest {
 
         final Properties properties = new Properties();
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
@@ -326,7 +326,7 @@ public class EosIntegrationTest {
 
         final Properties properties = new Properties();
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -940,7 +940,7 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), maxPollIntervalMs);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index aacb9e0483b..7f652f0c7f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -949,7 +949,7 @@ public class EosV2UpgradeIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
         properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
         properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
         properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
deleted file mode 100644
index b3c61248254..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
-import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-@Category(IntegrationTest.class)
-public class ErrorHandlingIntegrationTest {
-
-    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
-
-    @BeforeClass
-    public static void startCluster() throws IOException {
-        CLUSTER.start();
-    }
-
-    @AfterClass
-    public static void closeCluster() {
-        CLUSTER.stop();
-    }
-
-    @Rule
-    public TestName testName = new TestName();
-
-    private final String testId = safeUniqueTestName(getClass(), testName);
-    private final String appId = "appId_" + testId;
-    private final Properties properties = props();
-
-    // Task 0
-    private final String inputTopic = "input" + testId;
-    private final String outputTopic = "output" + testId;
-    // Task 1
-    private final String errorInputTopic = "error-input" + testId;
-    private final String errorOutputTopic = "error-output" + testId;
-
-    @Before
-    public void setup() {
-        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
-    }
-
-    private Properties props() {
-        return mkObjectProperties(
-            mkMap(
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
-                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
-                mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
-                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
-                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class))
-        );
-    }
-
-    @Test
-    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
-        final AtomicInteger noOutputExpected = new AtomicInteger(0);
-        final AtomicInteger outputExpected = new AtomicInteger(0);
-
-        try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
-            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-
-            final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
-            builder.stream(inputTopic).peek((k, v) -> outputExpected.incrementAndGet()).to(outputTopic);
-            builder.stream(errorInputTopic)
-                .peek((k, v) -> {
-                    throw new RuntimeException("Kaboom");
-                })
-                .peek((k, v) -> noOutputExpected.incrementAndGet())
-                .to(errorOutputTopic);
-
-            kafkaStreams.addNamedTopology(builder.build());
-
-            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                errorInputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    StringDeserializer.class
-                ),
-                outputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                )
-            );
-            assertThat(noOutputExpected.get(), equalTo(0));
-            assertThat(outputExpected.get(), equalTo(2));
-        }
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index b7b0b5a7149..05f2359efc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -147,7 +147,7 @@ public class FineGrainedAutoResetIntegrationTest {
     public void setUp() throws IOException {
 
         final Properties props = new Properties();
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -286,7 +286,7 @@ public class FineGrainedAutoResetIntegrationTest {
     @Test
     public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index b2272d0ec2b..3ac94ad9683 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -136,7 +136,7 @@ public class GlobalKTableEOSIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0L);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
         streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 64a83dff9da..d2c92a761dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -114,7 +114,7 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
                                           Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 825ec13028d..6f4bd534277 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -121,7 +121,7 @@ public class GlobalThreadShutDownOrderTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 6e5ec644a49..98d6c7c4bd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -109,7 +109,7 @@ public class InternalTopicIntegrationTest {
         streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
-        streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 99f03df22bd..f7a32934b46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -111,7 +111,7 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
 
         final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 9ec0806cc2d..a740605bd8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -146,7 +146,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 1cff0fc2016..5043ee2f8b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -142,7 +142,7 @@ public class KStreamRepartitionIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index d5f411ef1fb..006dd9275ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -185,7 +185,6 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         assertEquals(expectedResult, result);
     }
 
-    @SuppressWarnings("deprecation")
     private static Properties getStreamsConfig() {
         final Properties streamsConfig = new Properties();
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 82f6ffae188..3604f1127e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -212,7 +212,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
         streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
 
         return streamsConfig;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index b5778835f80..6b98d77589b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -72,7 +72,7 @@ public class KTableSourceTopicRestartIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
         STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 2e1dd141f85..c937999ca5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -198,8 +198,6 @@ public class MetricsIntegrationTest {
     private static final String THREAD_START_TIME = "thread-start-time";
     private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
     private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
-    private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
-    private static final String CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
     private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
     private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
     private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
@@ -260,7 +258,7 @@ public class MetricsIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
     }
@@ -537,8 +535,6 @@ public class MetricsIntegrationTest {
         checkMetricByName(listMetricTask, PUNCTUATE_TOTAL, 4);
         checkMetricByName(listMetricTask, PROCESS_RATE, 4);
         checkMetricByName(listMetricTask, PROCESS_TOTAL, 4);
-        checkMetricByName(listMetricTask, INPUT_BUFFER_BYTES_TOTAL, 4);
-        checkMetricByName(listMetricTask, CACHE_SIZE_BYTES_TOTAL, 3);
     }
 
     private void checkProcessorNodeLevelMetrics() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 727d89fa07f..8facc8279d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -820,7 +820,7 @@ public class NamedTopologyIntegrationTest {
         try {
             final AtomicInteger noOutputExpected = new AtomicInteger(0);
             final AtomicInteger outputExpected = new AtomicInteger(0);
-            props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
             props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
             props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
             props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 2e709af0b11..8f8faea448f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -213,7 +213,7 @@ public class OptimizedKTableIntegrationTest {
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
-        config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index e09ba997b2e..c897e0602f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -132,7 +132,7 @@ public class PauseResumeIntegrationTest {
         properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 82409af50f8..a5dbdcc18c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -967,7 +967,7 @@ public class QueryableStateIntegrationTest {
     }
 
     private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSizeBytes);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1a22c90fcf1..894108c246d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -133,7 +133,7 @@ public class RegexSourceIntegrationTest {
     public void setUp() throws InterruptedException {
         outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index b7523a99e5c..dc0be1a1c71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -126,7 +126,7 @@ public class RestoreIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 2d667839fae..17610c8450d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -200,7 +200,7 @@ public class RocksDBMetricsIntegrationTest {
         streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         return streamsConfiguration;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 395ef6efd66..b15ecee4942 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -141,7 +141,7 @@ public class SlidingWindowedKStreamIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 2ac2c2bddee..ed68a1f33ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -403,7 +403,7 @@ public class StandbyTaskEOSIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
         streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 17df2b59a21..6c6fc5d24b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -94,7 +94,7 @@ public class StoreUpgradeIntegrationTest {
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index 38dc3a6ffc1..84876e77fd7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -124,7 +124,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 9abc2c9500d..e0ec6c9bbef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -148,7 +148,7 @@ public class TimeWindowedKStreamIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 49e7913dc0e..0974ed6464b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -51,7 +51,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-@SuppressWarnings({"unchecked"})
+@SuppressWarnings("unchecked")
 public class KTableFilterTest {
     private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
@@ -59,7 +59,7 @@ public class KTableFilterTest {
     @Before
     public void setUp() {
         // disable caching at the config level
-        props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
+        props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
     }
 
     private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 41876581b38..3356b37c408 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -104,7 +104,7 @@ public class SessionWindowedKStreamImplTest {
 
     @Test
     public void shouldCountSessionWindowedWithCachingDisabled() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         shouldCountSessionWindowed();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index d05945b6425..f620cd22c18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -966,22 +966,9 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() {
-        final Properties globalProps = StreamsTestUtils.getStreamsConfig();
-        globalProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 200L);
-        globalProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 100L);
-        final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps);
-        final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig);
-        assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties())));
-        assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
     public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
         final Properties topologyOverrides = new Properties();
-        topologyOverrides.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
+        topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
         topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
         topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
         topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
@@ -1007,10 +994,9 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
         final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
-        streamsProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
+        streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
         streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
         streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
         streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index e29cf69d616..632b0efc23c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -19,11 +19,9 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Value;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -46,13 +44,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.UUID;
-import java.util.Collections;
-import java.util.Optional;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -89,7 +84,6 @@ public class PartitionGroupTest {
     private final Metrics metrics = new Metrics();
     private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString());
     private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap());
-    private final MetricName totalBytesValue = new MetricName("total-bytes-last-value", "", "", mkMap());
 
 
     private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) {
@@ -491,7 +485,6 @@ public class PartitionGroupTest {
             mkMap(mkEntry(partition1, queue1)),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             maxTaskIdleMs
         );
@@ -525,7 +518,6 @@ public class PartitionGroupTest {
             mkMap(mkEntry(partition1, queue1)),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             maxTaskIdleMs
         );
@@ -561,7 +553,6 @@ public class PartitionGroupTest {
             ),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             StreamsConfig.MAX_TASK_IDLE_MS_DISABLED
         );
@@ -600,7 +591,6 @@ public class PartitionGroupTest {
             ),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             0L
         );
@@ -640,7 +630,6 @@ public class PartitionGroupTest {
             ),
             tp -> lags.getOrDefault(tp, OptionalLong.empty()),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             0L
         );
@@ -677,7 +666,6 @@ public class PartitionGroupTest {
             ),
             tp -> lags.getOrDefault(tp, OptionalLong.empty()),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             0L
         );
@@ -714,7 +702,6 @@ public class PartitionGroupTest {
             ),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             1L
         );
@@ -777,81 +764,6 @@ public class PartitionGroupTest {
         }
     }
 
-    @Test
-    public void shouldUpdateTotalBytesBufferedOnRecordsAdditionAndConsumption() {
-        final PartitionGroup group = getBasicGroup();
-
-        assertEquals(0, group.numBuffered());
-        assertEquals(0L, group.totalBytesBuffered());
-
-        // add three 3 records with timestamp 1, 5, 3 to partition-1
-        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-                new ConsumerRecord<>("topic", 1, 1L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>("topic", 1, 5L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>("topic", 1, 3L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
-
-        long partition1TotalBytes = getBytesBufferedForRawRecords(list1);
-        group.addRawRecords(partition1, list1);
-
-        verifyBuffered(3, 3, 0, group);
-        assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
-        assertEquals(-1L, group.streamTime());
-        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
-        assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
-
-        StampedRecord record;
-        final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
-
-        // get first two records from partition 1
-        record = group.nextRecord(info, time.milliseconds());
-        assertEquals(record.timestamp, 1L);
-        record = group.nextRecord(info, time.milliseconds());
-        assertEquals(record.timestamp, 5L);
-
-        partition1TotalBytes -= getBytesBufferedForRawRecords(Arrays.asList(list1.get(0), list1.get(0)));
-        assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
-        assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
-
-        // add three 3 records with timestamp 2, 4, 6 to partition-2
-        final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-                new ConsumerRecord<>("topic", 2, 2L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>("topic", 2, 4L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>("topic", 2, 6L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
-
-        long partition2TotalBytes = getBytesBufferedForRawRecords(list2);
-        group.addRawRecords(partition2, list2);
-        // 1:[3]
-        // 2:[2, 4, 6]
-        assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
-        assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
-
-        // get one record, next record should be ts=2 from partition 2
-        record = group.nextRecord(info, time.milliseconds());
-        // 1:[3]
-        // 2:[4, 6]
-        partition2TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(0)));
-        assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
-        assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
-        assertEquals(record.timestamp, 2L);
-
-        // get one record, next up should have ts=3 from partition 1 (even though it has seen a larger max timestamp =5)
-        record = group.nextRecord(info, time.milliseconds());
-        // 1:[]
-        // 2:[4, 6]
-        partition1TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(2)));
-        assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
-        assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
-        assertEquals(record.timestamp, 3L);
-    }
-
-    private long getBytesBufferedForRawRecords(final List<ConsumerRecord<byte[], byte[]>> rawRecords) {
-        long rawRecordsSizeInBytes = 0L;
-        for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-            rawRecordsSizeInBytes += consumerRecordSizeInBytes(rawRecord);
-        }
-        return rawRecordsSizeInBytes;
-    }
-
     private PartitionGroup getBasicGroup() {
         return new PartitionGroup(
             logContext,
@@ -861,7 +773,6 @@ public class PartitionGroupTest {
             ),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            getValueSensor(metrics, totalBytesValue),
             enforcedProcessingSensor,
             maxTaskIdleMs
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 7d0e7165ba7..251a2637c43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -110,7 +110,7 @@ public class RepartitionOptimizingTest {
     @Before
     public void setUp() {
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-        streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
+        streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
         streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
 
         processorValueCollector.clear();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index bc04505ebfe..b388a6a67ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -86,7 +86,7 @@ public class RepartitionWithMergeOptimizingTest {
     @Before
     public void setUp() {
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-        streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
+        streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
         streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 17f67ec792f..dd6bb6cafc5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -108,6 +108,7 @@ public class StandbyTaskTest {
         return new StreamsConfig(mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()),
             mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName())
         )));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 266e649ca71..8f18c01e3db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -227,7 +227,6 @@ public class StreamTaskTest {
         return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
     }
 
-    @SuppressWarnings("deprecation")
     private static StreamsConfig createConfig(
         final String eosConfig,
         final String enforcedProcessingValue,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 74deafd32bf..5d898c345bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -121,7 +121,6 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
-import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -164,7 +163,6 @@ public class StreamThreadTest {
     private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true, false);
     private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
-    private final long defaultMaxBufferSizeInBytes = 512 * 1024 * 1024;
 
     private StreamsMetadataState streamsMetadataState;
     private final static BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
@@ -204,6 +202,7 @@ public class StreamThreadTest {
         return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
             mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()),
             mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE),
@@ -254,7 +253,6 @@ public class StreamThreadTest {
             mockTime,
             streamsMetadataState,
             0,
-            defaultMaxBufferSizeInBytes,
             stateDirectory,
             new MockStateRestoreListener(),
             threadIdx,
@@ -533,7 +531,6 @@ public class StreamThreadTest {
             mockTime,
             streamsMetadataState,
             0,
-            defaultMaxBufferSizeInBytes,
             stateDirectory,
             new MockStateRestoreListener(),
             threadIdx,
@@ -1204,8 +1201,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
         final StreamsException thrown = assertThrows(StreamsException.class, thread::run);
@@ -1593,7 +1589,6 @@ public class StreamThreadTest {
             mockTime,
             streamsMetadataState,
             0,
-            defaultMaxBufferSizeInBytes,
             stateDirectory,
             new MockStateRestoreListener(),
             threadIdx,
@@ -2343,8 +2338,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -2411,8 +2405,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -2487,8 +2480,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -2558,8 +2550,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -2627,8 +2618,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -2781,205 +2771,6 @@ public class StreamThreadTest {
         }
     }
 
-    @Test
-    public void shouldPauseNonEmptyPartitionsWhenTotalBufferSizeExceedsMaxBufferSize() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
-        final List<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
-        consumer.assign(assignedPartitions);
-        records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
-                t1p1.topic(),
-                t1p1.partition(),
-                1,
-                mockTime.milliseconds(),
-                TimestampType.CREATE_TIME,
-                2,
-                6,
-                new byte[2],
-                new byte[6],
-                new RecordHeaders(),
-                Optional.empty())));
-        expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records)).anyTimes();
-        EasyMock.replay(consumer, consumerGroupMetadata);
-        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-
-        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
-        final Metric testMetric = new KafkaMetric(
-                new Object(),
-                testMetricName,
-                (Measurable) (config, now) -> 0,
-                null,
-                new MockTime());
-        final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
-
-        expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
-        EasyMock.replay(taskManager);
-
-        final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
-        topologyMetadata.buildAndRewriteTopology();
-
-        final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
-        final StreamThread thread = new StreamThread(
-                mockTime,
-                config,
-                null,
-                consumer,
-                consumer,
-                changelogReader,
-                null,
-                taskManager,
-                streamsMetrics,
-                topologyMetadata,
-                CLIENT_ID,
-                new LogContext(""),
-                new AtomicInteger(),
-                new AtomicLong(Long.MAX_VALUE),
-                new LinkedList<>(),
-                null,
-                HANDLER,
-                null,
-                10
-        );
-        thread.setState(StreamThread.State.STARTING);
-        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
-        thread.pollPhase();
-        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-        thread.pollPhase();
-        EasyMock.reset(consumer);
-        consumer.pause(anyObject());
-        // Consumer.pause should be called only once, when we added the second record.
-        EasyMock.expectLastCall().times(1);
-    }
-
-    @Test
-    public void shouldResumePartitionsAfterConsumptionWhenTotalBufferSizeIsLTEMaxBufferSize() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
-        final ChangelogReader changelogReader = EasyMock.createNiceMock(ChangelogReader.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        changelogReader.restore(anyObject());
-        expectLastCall().andVoid();
-
-        final Task task1 = mock(Task.class);
-        final Task task2 = mock(Task.class);
-
-        final TaskId taskId1 = new TaskId(0, 1);
-        final TaskId taskId2 = new TaskId(0, 2);
-
-        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
-        expect(task1.id()).andReturn(taskId1).anyTimes();
-        expect(task1.inputPartitions()).andReturn(mkSet(t1p1)).anyTimes();
-        expect(task1.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
-        expect(task1.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
-        expect(task1.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
-
-        expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
-        expect(task2.id()).andReturn(taskId2).anyTimes();
-        expect(task2.inputPartitions()).andReturn(mkSet(t1p2)).anyTimes();
-        expect(task2.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
-        expect(task2.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
-        expect(task2.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
-        EasyMock.replay(task1, task2);
-
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
-        records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
-                t1p1.topic(),
-                t1p1.partition(),
-                1,
-                mockTime.milliseconds(),
-                TimestampType.CREATE_TIME,
-                2,
-                6,
-                new byte[2],
-                new byte[6],
-                new RecordHeaders(),
-                Optional.empty())));
-        records.put(t1p2, Collections.singletonList(new ConsumerRecord<>(
-                t1p2.topic(),
-                t1p2.partition(),
-                1,
-                mockTime.milliseconds(),
-                TimestampType.CREATE_TIME,
-                2,
-                6,
-                new byte[2],
-                new byte[6],
-                new RecordHeaders(),
-                Optional.empty())));
-
-        final List<TopicPartition> assignedPartitions = Arrays.asList(t1p1, t1p2);
-        consumer.assign(assignedPartitions);
-        expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records));
-        EasyMock.replay(consumer, consumerGroupMetadata);
-
-        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-
-        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
-        final Metric testMetric = new KafkaMetric(
-                new Object(),
-                testMetricName,
-                (Measurable) (config, now) -> 0,
-                null,
-                new MockTime());
-        final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
-        expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
-        expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
-        expect(taskManager.activeTaskMap()).andReturn(mkMap(
-                mkEntry(taskId1, task1),
-                mkEntry(taskId2, task2)
-                ));
-        expect(taskManager.tasks()).andStubReturn(mkMap(
-                mkEntry(taskId1, task1),
-                mkEntry(taskId2, task2)
-        ));
-        expect(taskManager.standbyTaskMap()).andReturn(new HashMap<>());
-        expect(taskManager.commit(anyObject())).andReturn(0);
-        expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
-        expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
-        expect(taskManager.process(anyInt(), anyObject())).andReturn(0);
-
-        EasyMock.replay(taskManager);
-
-        final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
-        final StreamThread thread = new StreamThread(
-                mockTime,
-                new StreamsConfig(configProps(true)),
-                null,
-                consumer,
-                consumer,
-                changelogReader,
-                null,
-                taskManager,
-                streamsMetrics,
-                new TopologyMetadata(internalTopologyBuilder, config),
-                CLIENT_ID,
-                new LogContext(""),
-                new AtomicInteger(),
-                new AtomicLong(Long.MAX_VALUE),
-                new LinkedList<>(),
-                null,
-                HANDLER,
-                null,
-                6
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
-        thread.setState(StreamThread.State.STARTING);
-        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
-        thread.setState(StreamThread.State.RUNNING);
-
-        thread.runOnce();
-        EasyMock.reset(consumer);
-        consumer.resume(anyObject());
-        // Consumer.resume should be called only once, when we added the second record.
-        EasyMock.expectLastCall().times(1);
-    }
-
     @Test
     public void shouldTransmitTaskManagerMetrics() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -3048,8 +2839,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         );
         final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
         final Metric testMetric = new KafkaMetric(
@@ -3106,8 +2896,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             (e, b) -> { },
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         ) {
             @Override
             void runOnce() {
@@ -3251,8 +3040,7 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null,
-            defaultMaxBufferSizeInBytes
+            null
         );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
index 1e3f47cd76d..a38fb322bc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
@@ -91,52 +91,6 @@ public class TaskMetricsTest {
         }
     }
 
-    @Test
-    public void shouldGetTotalBytesSensor() {
-        final String operation = "input-buffer-bytes-total";
-        when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO))
-                .thenReturn(expectedSensor);
-        final String totalBytesDescription = "The total number of bytes accumulated in this task's input buffer";
-        when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);
-
-        try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
-            final Sensor sensor = TaskMetrics.totalInputBufferBytesSensor(THREAD_ID, TASK_ID, streamsMetrics);
-            streamsMetricsStaticMock.verify(
-                () -> StreamsMetricsImpl.addValueMetricToSensor(
-                    expectedSensor,
-                    TASK_LEVEL_GROUP,
-                    tagMap,
-                    operation,
-                    totalBytesDescription
-                )
-            );
-            assertThat(sensor, is(expectedSensor));
-        }
-    }
-
-    @Test
-    public void shouldGetTotalCacheSizeInBytesSensor() {
-        final String operation = "cache-size-bytes-total";
-        when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO))
-                .thenReturn(expectedSensor);
-        final String totalBytesDescription = "The total size in bytes of this task's cache.";
-        when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);
-
-        try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
-            final Sensor sensor = TaskMetrics.totalCacheSizeBytesSensor(THREAD_ID, TASK_ID, streamsMetrics);
-            streamsMetricsStaticMock.verify(
-                () -> StreamsMetricsImpl.addValueMetricToSensor(
-                    expectedSensor,
-                    TASK_LEVEL_GROUP,
-                    tagMap,
-                    operation,
-                    totalBytesDescription
-                )
-            );
-            assertThat(sensor, is(expectedSensor));
-        }
-    }
-
     @Test
     public void shouldGetProcessLatencySensor() {
         final String operation = "process-latency";
@@ -299,4 +253,4 @@ public class TaskMetricsTest {
             assertThat(sensor, is(expectedSensor));
         }
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 4d2022ca5af..6d43b4cfc8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -33,10 +32,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -48,12 +43,11 @@ public class NamedCacheTest {
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private final Metrics innerMetrics = new Metrics();
-    private final StreamsMetricsImpl metrics = new MockStreamsMetrics(innerMetrics);
-    private final MetricName cacheSizeBytesTotal = new MetricName("cache-size-bytes-total", "stream-task-metrics", "", mkMap(mkEntry("thread-id", "Test worker"), mkEntry("task-id", "dummy")));
 
     @Before
     public void setUp() {
+        final Metrics innerMetrics = new Metrics();
+        final StreamsMetricsImpl metrics = new MockStreamsMetrics(innerMetrics);
         cache = new NamedCache("dummy-name", metrics);
     }
 
@@ -88,7 +82,6 @@ public class NamedCacheTest {
         cache.put(Bytes.wrap(new byte[]{1}), value);
         cache.put(Bytes.wrap(new byte[]{2}), value);
         final long size = cache.sizeInBytes();
-        assertThat(metrics.metrics().get(cacheSizeBytesTotal).metricValue(), is((double) size));
         // 1 byte key + 24 bytes overhead
         assertEquals((value.size() + 25) * 3, size);
     }
@@ -121,7 +114,6 @@ public class NamedCacheTest {
         final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
         assertArrayEquals(new byte[] {10}, deleted.value());
         assertEquals(0, cache.sizeInBytes());
-        assertThat(metrics.metrics().get(cacheSizeBytesTotal).metricValue(), is((double) 0));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 1e06b4ab314..8a402ab9abf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -72,7 +72,7 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
-        streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
         final int timeout = 6000;
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index ae9d7527d7a..3b1aa4478f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -106,7 +106,7 @@ public class EosTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis());
         props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index af3614c7326..b98f86141a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -87,7 +87,7 @@ public class StreamsNamedRepartitionTest {
         final Properties config = new Properties();
 
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
-        config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
+        config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index 95945b1b446..714aa110ef3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -110,7 +110,7 @@ public class StreamsOptimizedTest {
 
 
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest");
-        config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
+        config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         config.setProperty(StreamsConfig.adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), "100");
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 2568b498c97..3c693cc7135 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -67,7 +67,7 @@ public class StreamsStandByReplicaTest {
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c918a491ae7..9e7e285bd10 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -117,7 +117,6 @@ import java.util.regex.Pattern;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
 /**
@@ -156,7 +155,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
  *
  * <p> Note that the {@code TopologyTestDriver} processes input records synchronously.
  * This implies that {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit.interval.ms} and
- * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache.max.bytes.buffering} configuration have no effect.
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache.max.bytes.buffering} configuration have no effect.
  * The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen
  * after each input record.
  *
@@ -310,7 +309,6 @@ public class TopologyTestDriver implements Closeable {
      * @param config the configuration for the topology
      * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
      */
-    @SuppressWarnings({"unchecked", "deprecation"})
     private TopologyTestDriver(final InternalTopologyBuilder builder,
                                final Properties config,
                                final long initialWallClockTimeMs) {
@@ -331,7 +329,7 @@ public class TopologyTestDriver implements Closeable {
 
         final ThreadCache cache = new ThreadCache(
             logContext,
-            Math.max(0, getTotalCacheSize(streamsConfig)),
+            Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
             streamsMetrics
         );