You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/03/22 00:18:00 UTC
[kafka] branch trunk updated: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (#11796)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0924fd3 KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (#11796)
0924fd3 is described below
commit 0924fd3f9f75c446310ed1e97b44bbc3f33c6c31
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Tue Mar 22 05:46:00 2022 +0530
KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (#11796)
Implements KIP-770
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../examples/pageview/PageViewTypedDemo.java | 2 +-
.../examples/pageview/PageViewUntypedDemo.java | 2 +-
.../examples/temperature/TemperatureDemo.java | 2 +-
.../streams/examples/wordcount/WordCountDemo.java | 2 +-
.../examples/wordcount/WordCountProcessorDemo.java | 2 +-
.../wordcount/WordCountTransformerDemo.java | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 76 ++++---
.../org/apache/kafka/streams/StreamsConfig.java | 46 ++++
.../org/apache/kafka/streams/TopologyConfig.java | 43 +++-
.../kafka/streams/kstream/CogroupedKStream.java | 8 +-
.../kafka/streams/kstream/KGroupedStream.java | 20 +-
.../kafka/streams/kstream/KGroupedTable.java | 22 +-
.../kstream/SessionWindowedCogroupedKStream.java | 8 +-
.../streams/kstream/SessionWindowedKStream.java | 24 +--
.../kstream/TimeWindowedCogroupedKStream.java | 8 +-
.../kafka/streams/kstream/TimeWindowedKStream.java | 24 +--
.../processor/internals/PartitionGroup.java | 81 ++++---
.../streams/processor/internals/RecordQueue.java | 37 ++++
.../streams/processor/internals/StreamTask.java | 15 +-
.../streams/processor/internals/StreamThread.java | 40 +++-
.../kafka/streams/processor/internals/Task.java | 1 +
.../streams/processor/internals/TaskManager.java | 19 ++
.../processor/internals/metrics/TaskMetrics.java | 37 ++++
.../kafka/streams/state/internals/NamedCache.java | 10 +
.../org/apache/kafka/streams/KafkaStreamsTest.java | 9 +-
.../apache/kafka/streams/StreamsConfigTest.java | 30 +++
.../integration/AbstractJoinIntegrationTest.java | 2 +-
.../integration/AbstractResetIntegrationTest.java | 2 +-
.../integration/AdjustStreamThreadCountTest.java | 89 +++++++-
.../integration/EmitOnChangeIntegrationTest.java | 175 ++++++++++++++++
.../streams/integration/EosIntegrationTest.java | 6 +-
.../integration/EosV2UpgradeIntegrationTest.java | 2 +-
.../integration/ErrorHandlingIntegrationTest.java | 161 ++++++++++++++
.../FineGrainedAutoResetIntegrationTest.java | 4 +-
.../GlobalKTableEOSIntegrationTest.java | 2 +-
.../integration/GlobalKTableIntegrationTest.java | 2 +-
.../integration/GlobalThreadShutDownOrderTest.java | 2 +-
.../integration/InternalTopicIntegrationTest.java | 2 +-
.../KStreamAggregationDedupIntegrationTest.java | 2 +-
.../KStreamAggregationIntegrationTest.java | 2 +-
.../KStreamRepartitionIntegrationTest.java | 2 +-
...yInnerJoinCustomPartitionerIntegrationTest.java | 1 +
...bleForeignKeyInnerJoinMultiIntegrationTest.java | 2 +-
.../KTableSourceTopicRestartIntegrationTest.java | 2 +-
.../integration/MetricsIntegrationTest.java | 6 +-
.../integration/NamedTopologyIntegrationTest.java | 2 +-
.../OptimizedKTableIntegrationTest.java | 2 +-
.../integration/QueryableStateIntegrationTest.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 2 +-
.../integration/RestoreIntegrationTest.java | 2 +-
.../integration/RocksDBMetricsIntegrationTest.java | 2 +-
.../integration/StandbyTaskEOSIntegrationTest.java | 2 +-
.../integration/StoreUpgradeIntegrationTest.java | 2 +-
...bleJoinTopologyOptimizationIntegrationTest.java | 2 +-
.../kstream/internals/KTableFilterTest.java | 4 +-
.../internals/SessionWindowedKStreamImplTest.java | 2 +-
.../internals/InternalTopologyBuilderTest.java | 18 +-
.../processor/internals/PartitionGroupTest.java | 101 +++++++++
.../internals/RepartitionOptimizingTest.java | 2 +-
.../RepartitionWithMergeOptimizingTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 1 -
.../processor/internals/StreamTaskTest.java | 1 +
.../processor/internals/StreamThreadTest.java | 232 ++++++++++++++++++++-
.../internals/metrics/TaskMetricsTest.java | 41 ++++
.../streams/state/internals/NamedCacheTest.java | 12 +-
.../streams/tests/BrokerCompatibilityTest.java | 2 +-
.../apache/kafka/streams/tests/EosTestClient.java | 2 +-
.../streams/tests/StreamsNamedRepartitionTest.java | 2 +-
.../kafka/streams/tests/StreamsOptimizedTest.java | 2 +-
.../streams/tests/StreamsStandByReplicaTest.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 5 +-
71 files changed, 1287 insertions(+), 198 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index a5086de..be54baf 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -179,7 +179,7 @@ public class PageViewTypedDemo {
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index cdb3639..8fc8744 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -60,7 +60,7 @@ public class PageViewUntypedDemo {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 3dc8eda..6e40fa0 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -77,7 +77,7 @@ public class TemperatureDemo {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
final Duration duration24Hours = Duration.ofHours(24);
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 4ca5d73..d290c66 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -61,7 +61,7 @@ public final class WordCountDemo {
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 014923f..6204c42 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -108,7 +108,7 @@ public final class WordCountProcessorDemo {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
index 028d317..8e80e92 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
@@ -131,7 +131,7 @@ public final class WordCountTransformerDemo {
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 039fdaf..369bc56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -174,6 +174,7 @@ public class KafkaStreams implements AutoCloseable {
protected final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;
private final long totalCacheSize;
+ private final long inputBufferMaxBytes;
private final StreamStateListener streamStateListener;
private final StateRestoreListener delegatingStateRestoreListener;
private final Map<Long, StreamThread.State> threadState;
@@ -938,9 +939,9 @@ public class KafkaStreams implements AutoCloseable {
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
- totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ totalCacheSize = applicationConfigs.getTotalCacheSize();
+ inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
- final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
@@ -950,7 +951,7 @@ public class KafkaStreams implements AutoCloseable {
applicationConfigs,
clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
stateDirectory,
- cacheSizePerThread,
+ 0L,
streamsMetrics,
time,
globalThreadId,
@@ -971,14 +972,16 @@ public class KafkaStreams implements AutoCloseable {
queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
for (int i = 1; i <= numStreamThreads; i++) {
- createAndAddStreamThread(cacheSizePerThread, i);
+ createAndAddStreamThread(0L, 0L, i);
}
+ // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here.
+ resizeThreadCacheAndBufferMemory(numStreamThreads);
stateDirCleaner = setupStateDirCleaner();
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
}
- private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
+ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final long maxBufferSizePerThread, final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
topologyMetadata,
applicationConfigs,
@@ -990,7 +993,7 @@ public class KafkaStreams implements AutoCloseable {
time,
streamsMetadataState,
cacheSizePerThread,
-
+ maxBufferSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
threadIdx,
@@ -1027,7 +1030,7 @@ public class KafkaStreams implements AutoCloseable {
* Since the number of stream threads increases, the sizes of the caches in the new stream thread
* and the existing stream threads are adapted so that the sum of the cache sizes over all stream
* threads does not exceed the total cache size specified in configuration
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
* <p>
* Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
*
@@ -1038,14 +1041,15 @@ public class KafkaStreams implements AutoCloseable {
final StreamThread streamThread;
synchronized (changeThreadCount) {
final int threadIdx = getNextThreadIndex();
- final int numLiveThreads = getNumLiveStreamThreads();
- final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
- log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
- threadIdx, numLiveThreads + 1, cacheSizePerThread);
- resizeThreadCache(cacheSizePerThread);
// Creating thread should hold the lock in order to avoid duplicate thread index.
// If the duplicate index happen, the metadata of thread may be duplicate too.
- streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
+ // Also, we create the new thread with initial values of cache size and max buffer size as 0
+ // and then resize them later
+ streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+ final int numLiveThreads = getNumLiveStreamThreads();
+ resizeThreadCacheAndBufferMemory(numLiveThreads + 1);
+ log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.",
+ threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString());
}
synchronized (stateLock) {
@@ -1056,9 +1060,9 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown();
threads.remove(streamThread);
- final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
- log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
- resizeThreadCache(cacheSizePerThread);
+ resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
+ log.info("Resizing thread cache and max buffer size per thread since new thread can not be " +
+ "started, cache size/max buffer size per thread is {}", getThreadCacheAndBufferMemoryString());
return Optional.empty();
}
}
@@ -1076,7 +1080,7 @@ public class KafkaStreams implements AutoCloseable {
* <p>
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
- * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
*
* @return name of the removed stream thread or empty if a stream thread could not be removed because
* no stream threads are alive
@@ -1091,9 +1095,10 @@ public class KafkaStreams implements AutoCloseable {
* The removed stream thread is gracefully shut down. This method does not specify which stream
* thread is shut down.
* <p>
- * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
- * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
- * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * Since the number of stream threads decreases, the sizes of the caches and buffer bytes in the remaining stream
+ * threads are adapted so that the sum of the cache sizes and buffer bytes over all stream threads equals the total
+ * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG} and
+ * {@link StreamsConfig#INPUT_BUFFER_MAX_BYTES_CONFIG} respectively.
*
* @param timeout The length of time to wait for the thread to shutdown
* @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time
@@ -1133,16 +1138,14 @@ public class KafkaStreams implements AutoCloseable {
}
} else {
log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait "
- + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
+ + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
}
-
- final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
- log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
- resizeThreadCache(cacheSizePerThread);
+ resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
+ log.info("Resizing thread cache/max buffer size due to removal of thread {}, new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString());
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
- final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
+ final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
adminClient.removeMembersFromConsumerGroup(
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
@@ -1241,15 +1244,22 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private long getCacheSizePerThread(final int numStreamThreads) {
- if (numStreamThreads == 0) {
- return totalCacheSize;
- }
- return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+ private String getThreadCacheAndBufferMemoryString() {
+ final StreamThread streamThread = threads.get(0);
+ return streamThread.getCacheSize() + "/" + streamThread.getMaxBufferSize();
}
- private void resizeThreadCache(final long cacheSizePerThread) {
- processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
+ private void resizeThreadCacheAndBufferMemory(final int numStreamThreads) {
+ final long cacheSizePerThread;
+ final long inputBufferMaxBytesPerThread;
+ if (numStreamThreads == 0) {
+ cacheSizePerThread = totalCacheSize;
+ inputBufferMaxBytesPerThread = inputBufferMaxBytes;
+ } else {
+ cacheSizePerThread = totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+ inputBufferMaxBytesPerThread = inputBufferMaxBytes / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+ }
+ processStreamThread(thread -> thread.resizeCacheAndBufferMemory(cacheSizePerThread, inputBufferMaxBytesPerThread));
if (globalStreamThread != null) {
globalStreamThread.resize(cacheSizePerThread);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 58ee252..e9e0cca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -369,18 +369,30 @@ public class StreamsConfig extends AbstractConfig {
/** {@code buffered.records.per.partition} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
+ /** {@code input.buffer.max.bytes} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String INPUT_BUFFER_MAX_BYTES_CONFIG = "input.buffer.max.bytes";
+ public static final String INPUT_BUFFER_MAX_BYTES_DOC = "Maximum bytes of records to buffer across all threads";
+
/** {@code built.in.metrics.version} */
public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
/** {@code cache.max.bytes.buffering} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
+ /** {@statestore.cache.max.bytes} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
+ public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
+
/** {@code client.id} */
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -688,6 +700,12 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
+ .define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ Type.LONG,
+ 10 * 1024 * 1024L,
+ atLeast(0),
+ Importance.MEDIUM,
+ STATESTORE_CACHE_MAX_BYTES_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
@@ -788,6 +806,11 @@ public class StreamsConfig extends AbstractConfig {
in(NO_OPTIMIZATION, OPTIMIZE),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
+ .define(INPUT_BUFFER_MAX_BYTES_CONFIG,
+ Type.LONG,
+ 512 * 1024 * 1024,
+ Importance.MEDIUM,
+ INPUT_BUFFER_MAX_BYTES_DOC)
// LOW
@@ -1513,6 +1536,29 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
+ public long getTotalCacheSize() {
+ // both deprecated and new config set. Warn and use the new one.
+ if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
+ if (!getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
+ log.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ }
+ return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ } else if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
+ // only deprecated config set.
+ log.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ return getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ }
+ // only new or no config set. Use default or user specified value.
+ return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ }
+
/**
* Get the configs for the {@link Admin admin client}.
* @param clientId clientId
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index f046e71..de8aec9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -32,9 +32,10 @@ import java.util.Properties;
import java.util.function.Supplier;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
-import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
@@ -55,15 +56,21 @@ import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
+@SuppressWarnings("deprecation")
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
- .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
- Type.INT,
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ null,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+ .define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ Type.LONG,
null,
- Importance.LOW,
- BUFFERED_RECORDS_PER_PARTITION_DOC)
+ Importance.MEDIUM,
+ CACHE_MAX_BYTES_BUFFERING_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
null,
@@ -129,14 +136,32 @@ public class TopologyConfig extends AbstractConfig {
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
} else {
- maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+ maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+ ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
}
- if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+ if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
+ cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
+ topologyName,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ cacheSize);
+ } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
- log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+ log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
+ "we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
+ topologyName,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ cacheSize,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ } else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) {
+ cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
} else {
- cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ cacheSize = globalAppConfigs.getTotalCacheSize();
}
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 051396f..b0f1dec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -78,7 +78,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -128,7 +128,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -179,7 +179,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -232,7 +232,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 072558c..513d94d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -53,7 +53,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -81,7 +81,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -112,7 +112,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -158,7 +158,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -211,7 +211,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
*
* <p>
@@ -262,7 +262,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -326,7 +326,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -385,7 +385,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
*
* <p>
@@ -431,7 +431,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -490,7 +490,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 06d12e1..5733aef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -52,7 +52,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -95,7 +95,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -138,7 +138,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -167,7 +167,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -223,7 +223,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -296,7 +296,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -368,7 +368,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -434,7 +434,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -518,7 +518,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@@ -604,7 +604,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
@@ -674,7 +674,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index b7e3b07..eeeb3e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -77,7 +77,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -122,7 +122,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -166,7 +166,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -226,7 +226,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 1b7a363..c561b62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -65,7 +65,7 @@ public interface SessionWindowedKStream<K, V> {
* the same session and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -95,7 +95,7 @@ public interface SessionWindowedKStream<K, V> {
* the same session and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -126,7 +126,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -172,7 +172,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -233,7 +233,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -282,7 +282,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -330,7 +330,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -391,7 +391,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -459,7 +459,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -504,7 +504,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -549,7 +549,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@@ -609,7 +609,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index e4178bc..a46da05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -75,7 +75,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -115,7 +115,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -156,7 +156,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -213,7 +213,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index c015e79..8bce98a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -65,7 +65,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -95,7 +95,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -126,7 +126,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -175,7 +175,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -236,7 +236,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -281,7 +281,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -326,7 +326,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -387,7 +387,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -457,7 +457,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -502,7 +502,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@@ -547,7 +547,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@@ -610,7 +610,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
- * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 199bc0e..dd1257f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -64,10 +64,12 @@ public class PartitionGroup {
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
+ private final Sensor totalBytesSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
private long streamTime;
private int totalBuffered;
+ private long totalBytesBuffered;
private boolean allBuffered;
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();
@@ -92,6 +94,7 @@ public class PartitionGroup {
final Function<TopicPartition, OptionalLong> lagProvider,
final Sensor recordLatenessSensor,
final Sensor enforcedProcessingSensor,
+ final Sensor totalBytesSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
@@ -100,6 +103,7 @@ public class PartitionGroup {
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
+ this.totalBytesSensor = totalBytesSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
@@ -118,11 +122,11 @@ public class PartitionGroup {
}
}
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
- "\n\tThere may be out-of-order processing for this task as a result." +
- "\n\tBuffered partitions: {}" +
- "\n\tNon-buffered partitions: {}",
- bufferedPartitions,
- emptyPartitions);
+ "\n\tThere may be out-of-order processing for this task as a result." +
+ "\n\tBuffered partitions: {}" +
+ "\n\tNon-buffered partitions: {}",
+ bufferedPartitions,
+ emptyPartitions);
}
return true;
}
@@ -151,9 +155,9 @@ public class PartitionGroup {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
- "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
- partition,
- fetchedLag.getAsLong()
+ "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
+ partition,
+ fetchedLag.getAsLong()
);
return false;
} else {
@@ -167,11 +171,11 @@ public class PartitionGroup {
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
- "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
- partition,
- wallClockTime,
- maxTaskIdleMs,
- deadline
+ "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
+ partition,
+ wallClockTime,
+ maxTaskIdleMs,
+ deadline
);
return false;
} else {
@@ -193,15 +197,15 @@ public class PartitionGroup {
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.trace("Continuing to process although some partitions are empty on the broker." +
- "\n\tThere may be out-of-order processing for this task as a result." +
- "\n\tPartitions with local data: {}." +
- "\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
- "\n\tConfigured max.task.idle.ms: {}." +
- "\n\tCurrent wall-clock time: {}.",
- queued,
- enforced,
- maxTaskIdleMs,
- wallClockTime);
+ "\n\tThere may be out-of-order processing for this task as a result." +
+ "\n\tPartitions with local data: {}." +
+ "\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
+ "\n\tConfigured max.task.idle.ms: {}." +
+ "\n\tCurrent wall-clock time: {}.",
+ queued,
+ enforced,
+ maxTaskIdleMs,
+ wallClockTime);
return true;
}
}
@@ -225,6 +229,7 @@ public class PartitionGroup {
if (!newInputPartitions.contains(topicPartition)) {
// if partition is removed should delete its queue
totalBuffered -= queueEntry.getValue().size();
+ totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered();
queuesIterator.remove();
removedPartitions.add(topicPartition);
}
@@ -260,12 +265,17 @@ public class PartitionGroup {
info.queue = queue;
if (queue != null) {
+ // get the buffer size of queue before poll
+ final long oldBufferSize = queue.getTotalBytesBuffered();
// get the first record from this queue.
record = queue.poll();
+ // After polling, the buffer size would have reduced.
+ final long newBufferSize = queue.getTotalBytesBuffered();
if (record != null) {
--totalBuffered;
-
+ totalBytesBuffered -= oldBufferSize - newBufferSize;
+ totalBytesSensor.record(totalBytesBuffered);
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
@@ -301,7 +311,9 @@ public class PartitionGroup {
}
final int oldSize = recordQueue.size();
+ final long oldBufferSize = recordQueue.getTotalBytesBuffered();
final int newSize = recordQueue.addRawRecords(rawRecords);
+ final long newBufferSize = recordQueue.getTotalBytesBuffered();
// add this record queue to be considered for processing in the future if it was empty before
if (oldSize == 0 && newSize > 0) {
@@ -316,7 +328,8 @@ public class PartitionGroup {
}
totalBuffered += newSize - oldSize;
-
+ totalBytesBuffered += newBufferSize - oldBufferSize;
+ totalBytesSensor.record(totalBytesBuffered);
return newSize;
}
@@ -354,12 +367,20 @@ public class PartitionGroup {
return recordQueue.size();
}
+ Set<TopicPartition> getNonEmptyTopicPartitions() {
+ final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
+ for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
+ nonEmptyTopicPartitions.add(recordQueue.partition());
+ }
+ return nonEmptyTopicPartitions;
+ }
+
int numBuffered() {
return totalBuffered;
}
- boolean allPartitionsBufferedLocally() {
- return allBuffered;
+ long totalBytesBuffered() {
+ return totalBytesBuffered;
}
void clear() {
@@ -370,4 +391,10 @@ public class PartitionGroup {
totalBuffered = 0;
streamTime = RecordQueue.UNKNOWN;
}
-}
+
+ // Below methods are for only testing.
+
+ boolean allPartitionsBufferedLocally() {
+ return allBuffered;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 1c01966..90d67a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,8 +18,10 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@@ -50,6 +52,8 @@ public class RecordQueue {
private long partitionTime = UNKNOWN;
private final Sensor droppedRecordsSensor;
+ private long totalBytesBuffered;
+ private long headRecordSizeInBytes;
RecordQueue(final TopicPartition partition,
final SourceNode<?, ?> source,
@@ -74,6 +78,8 @@ public class RecordQueue {
droppedRecordsSensor
);
this.log = logContext.logger(RecordQueue.class);
+ this.totalBytesBuffered = 0L;
+ this.headRecordSizeInBytes = 0L;
}
void setPartitionTime(final long partitionTime) {
@@ -98,6 +104,25 @@ public class RecordQueue {
return partition;
}
+ private long sizeInBytes(final ConsumerRecord<byte[], byte[]> record) {
+ long headerSizeInBytes = 0L;
+
+ for (final Header header: record.headers().toArray()) {
+ headerSizeInBytes += Utils.utf8(header.key()).length;
+ if (header.value() != null) {
+ headerSizeInBytes += header.value().length;
+ }
+ }
+
+ return record.serializedKeySize() +
+ record.serializedValueSize() +
+ 8L + // timestamp
+ 8L + // offset
+ Utils.utf8(record.topic()).length +
+ 4L + // partition
+ headerSizeInBytes;
+ }
+
/**
* Add a batch of {@link ConsumerRecord} into the queue
*
@@ -107,6 +132,7 @@ public class RecordQueue {
int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
fifoQueue.addLast(rawRecord);
+ this.totalBytesBuffered += sizeInBytes(rawRecord);
}
updateHead();
@@ -121,7 +147,9 @@ public class RecordQueue {
*/
public StampedRecord poll() {
final StampedRecord recordToReturn = headRecord;
+ totalBytesBuffered -= headRecordSizeInBytes;
headRecord = null;
+ headRecordSizeInBytes = 0L;
partitionTime = Math.max(partitionTime, recordToReturn.timestamp);
updateHead();
@@ -167,6 +195,7 @@ public class RecordQueue {
public void clear() {
fifoQueue.clear();
headRecord = null;
+ headRecordSizeInBytes = 0L;
partitionTime = UNKNOWN;
}
@@ -206,6 +235,7 @@ public class RecordQueue {
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
+ headRecordSizeInBytes = sizeInBytes(raw);
}
// if all records in the FIFO queue are corrupted, make the last one the headRecord
@@ -221,4 +251,11 @@ public class RecordQueue {
long partitionTime() {
return partitionTime;
}
+
+ /**
+ * @return the total bytes buffered for this particular RecordQueue
+ */
+ long getTotalBytesBuffered() {
+ return totalBytesBuffered;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4185fcf..b2df5ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -189,6 +189,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
createPartitionQueues(),
mainConsumer::currentLag,
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
+ TaskMetrics.totalBytesSensor(threadId, taskId, streamsMetrics),
enforcedProcessingSensor,
maxTaskIdleMs
);
@@ -717,7 +718,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
- if (recordInfo.queue().size() == maxBufferedSize) {
+ // TODO maxBufferedSize != -1 would be removed once the deprecated config buffered.records.per.partition is removed
+ if (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
@@ -971,7 +973,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition
- if (newQueueSize > maxBufferedSize) {
+ // We do this only if the deprecated config buffered.records.per.partition is set
+ if (maxBufferedSize != -1 && newQueueSize > maxBufferedSize) {
mainConsumer.pause(singleton(partition));
}
}
@@ -1252,6 +1255,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return recordCollector;
}
+ Set<TopicPartition> getNonEmptyTopicPartitions() {
+ return this.partitionGroup.getNonEmptyTopicPartitions();
+ }
+
+ long totalBytesBuffered() {
+ return partitionGroup.totalBytesBuffered();
+ }
+
// below are visible for testing only
int numBuffered() {
return partitionGroup.numBuffered();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7401e53..ab4b094 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -318,6 +318,7 @@ public class StreamThread extends Thread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+ private final AtomicLong maxBufferSizeBytes = new AtomicLong(-1L);
private final boolean eosEnabled;
public static StreamThread create(final TopologyMetadata topologyMetadata,
@@ -330,6 +331,7 @@ public class StreamThread extends Thread {
final Time time,
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
+ final long maxBufferSizeBytes,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener,
final int threadIdx,
@@ -432,7 +434,8 @@ public class StreamThread extends Thread {
referenceContainer.nonFatalExceptionsToHandle,
shutdownErrorHook,
streamsUncaughtExceptionHandler,
- cache::resize
+ cache::resize,
+ maxBufferSizeBytes
);
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@@ -455,7 +458,8 @@ public class StreamThread extends Thread {
final Queue<StreamsException> nonFatalExceptionsToHandle,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
- final java.util.function.Consumer<Long> cacheResizer) {
+ final java.util.function.Consumer<Long> cacheResizer,
+ final long maxBufferSizeBytes) {
super(threadId);
this.stateLock = new Object();
this.adminClient = adminClient;
@@ -524,6 +528,7 @@ public class StreamThread extends Thread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
+ this.maxBufferSizeBytes.set(maxBufferSizeBytes);
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -706,8 +711,17 @@ public class StreamThread extends Thread {
}
}
- public void resizeCache(final long size) {
- cacheResizeSize.set(size);
+ public void resizeCacheAndBufferMemory(final long cacheSize, final long maxBufferSize) {
+ cacheResizeSize.set(cacheSize);
+ maxBufferSizeBytes.set(maxBufferSize);
+ }
+
+ public long getCacheSize() {
+ return cacheResizeSize.get();
+ }
+
+ public long getMaxBufferSize() {
+ return maxBufferSizeBytes.get();
}
/**
@@ -782,6 +796,10 @@ public class StreamThread extends Thread {
totalProcessed += processed;
totalRecordsProcessedSinceLastSummary += processed;
+ final long bufferSize = taskManager.getInputBufferSizeInBytes();
+ if (bufferSize <= maxBufferSizeBytes.get()) {
+ mainConsumer.resume(mainConsumer.paused());
+ }
}
log.debug("Processed {} records with {} iterations; invoking punctuators if necessary",
@@ -899,7 +917,8 @@ public class StreamThread extends Thread {
}
}
- private long pollPhase() {
+ // Visible for testing
+ long pollPhase() {
final ConsumerRecords<byte[], byte[]> records;
log.debug("Invoking poll on main Consumer");
@@ -945,6 +964,17 @@ public class StreamThread extends Thread {
if (!records.isEmpty()) {
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
+ // Check buffer size after adding records to tasks
+ final long bufferSize = taskManager.getInputBufferSizeInBytes();
+ // Pausing partitions as the buffer size now exceeds max buffer size
+ if (bufferSize > maxBufferSizeBytes.get()) {
+ log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get());
+ // Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with
+ // some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing
+ // and even lead to temporal deadlock. More explanation can be found here:
+ // https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
+ mainConsumer.pause(taskManager.nonEmptyPartitions());
+ }
}
while (!nonFatalExceptionsToHandle.isEmpty()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 3549ba2..fc3e6cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -246,4 +246,5 @@ public interface Task {
* @return This returns the time the task started idling. If it is not idling it returns empty.
*/
Optional<Long> timeCurrentIdlingStarted();
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f158563..4bc8e43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1067,6 +1067,17 @@ public class TaskManager {
}
/**
+ * Fetch all non-empty partitions for pausing
+ */
+ Set<TopicPartition> nonEmptyPartitions() {
+ final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
+ for (final Task task : activeTaskIterable()) {
+ nonEmptyPartitions.addAll(((StreamTask) task).getNonEmptyTopicPartitions());
+ }
+ return nonEmptyPartitions;
+ }
+
+ /**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
* @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS)
@@ -1169,6 +1180,14 @@ public class TaskManager {
}
}
+ long getInputBufferSizeInBytes() {
+ long bytesBuffered = 0L;
+ for (final Task task : activeTaskIterable()) {
+ bytesBuffered += ((StreamTask) task).totalBytesBuffered();
+ }
+ return bytesBuffered;
+ }
+
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException if any task threw an exception while processing
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
index cfa1ac6..f173bac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
@@ -84,6 +84,11 @@ public class TaskMetrics {
private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
"from consumer and not yet processed for this active task";
+ private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
+ private static final String INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION = "The total number of bytes accumulated in this task's input buffer";
+ private static final String CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
+ private static final String CACHE_SIZE_BYTES_TOTAL_DESCRIPTION = "The total size in bytes of this task's cache.";
+
public static Sensor processLatencySensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
@@ -128,6 +133,38 @@ public class TaskMetrics {
return sensor;
}
+ public static Sensor totalBytesSensor(final String threadId,
+ final String taskId,
+ final StreamsMetricsImpl streamsMetrics) {
+ final String name = INPUT_BUFFER_BYTES_TOTAL;
+ final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, RecordingLevel.INFO);
+
+ addValueMetricToSensor(
+ sensor,
+ TASK_LEVEL_GROUP,
+ streamsMetrics.taskLevelTagMap(threadId, taskId),
+ name,
+ INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION
+ );
+ return sensor;
+ }
+
+ public static Sensor totalCacheSizeBytesSensor(final String threadId,
+ final String taskId,
+ final StreamsMetricsImpl streamsMetrics) {
+ final String name = CACHE_SIZE_BYTES_TOTAL;
+ final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, Sensor.RecordingLevel.INFO);
+
+ addValueMetricToSensor(
+ sensor,
+ TASK_LEVEL_GROUP,
+ streamsMetrics.taskLevelTagMap(threadId, taskId),
+ name,
+ CACHE_SIZE_BYTES_TOTAL_DESCRIPTION
+ );
+ return sensor;
+ }
+
public static Sensor punctuateSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index ecf063b..71fdd4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.metrics.NamedCacheMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ class NamedCache {
private final StreamsMetricsImpl streamsMetrics;
private final Sensor hitRatioSensor;
+ private final Sensor totalCacheSizeSensor;
// internal stats
private long numReadHits = 0;
@@ -66,6 +68,11 @@ class NamedCache {
taskName,
storeName
);
+ totalCacheSizeSensor = TaskMetrics.totalCacheSizeBytesSensor(
+ Thread.currentThread().getName(),
+ taskName,
+ streamsMetrics
+ );
}
synchronized final String name() {
@@ -182,6 +189,7 @@ class NamedCache {
dirtyKeys.add(key);
}
currentSizeBytes += node.size();
+ totalCacheSizeSensor.record(currentSizeBytes);
}
synchronized long sizeInBytes() {
@@ -243,6 +251,7 @@ class NamedCache {
if (eldest.entry.isDirty()) {
flush(eldest);
}
+ totalCacheSizeSensor.record(currentSizeBytes);
}
synchronized LRUCacheEntry putIfAbsent(final Bytes key, final LRUCacheEntry value) {
@@ -269,6 +278,7 @@ class NamedCache {
remove(node);
dirtyKeys.remove(key);
currentSizeBytes -= node.size();
+ totalCacheSizeSensor.record(currentSizeBytes);
return node.entry();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9b9a671..b5620d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -229,6 +229,7 @@ public class KafkaStreamsTest {
anyObject(Time.class),
anyObject(StreamsMetadataState.class),
anyLong(),
+ anyLong(),
anyObject(StateDirectory.class),
anyObject(StateRestoreListener.class),
anyInt(),
@@ -241,6 +242,10 @@ public class KafkaStreamsTest {
EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
+ EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
+ EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
+ EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
+ EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
prepareStreamThread(streamThreadOne, 1, true);
prepareStreamThread(streamThreadTwo, 2, false);
@@ -289,6 +294,8 @@ public class KafkaStreamsTest {
EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get() == GlobalStreamThread.State.RUNNING).anyTimes();
globalStreamThread.join();
EasyMock.expectLastCall().anyTimes();
+ globalStreamThread.resize(EasyMock.anyLong());
+ EasyMock.expectLastCall().anyTimes();
PowerMock.replay(
StreamThread.class,
@@ -345,7 +352,7 @@ public class KafkaStreamsTest {
).anyTimes();
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
- thread.resizeCache(EasyMock.anyLong());
+ thread.resizeCacheAndBufferMemory(EasyMock.anyLong(), EasyMock.anyLong());
EasyMock.expectLastCall().anyTimes();
thread.requestLeaveGroupDuringShutdown();
EasyMock.expectLastCall().anyTimes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index e569727..243a474 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1252,6 +1252,36 @@ public class StreamsConfigTest {
);
}
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
+ props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(config.getTotalCacheSize(), 100);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
+ props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(config.getTotalCacheSize(), 10);
+ }
+
+ @Test
+ public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(config.getTotalCacheSize(), 10);
+ }
+
+ @Test
+ public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(config.getTotalCacheSize(), 10 * 1024 * 1024);
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index d41cec0..5f717e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -123,7 +123,7 @@ public abstract class AbstractJoinIntegrationTest {
void prepareEnvironment() throws InterruptedException {
if (!cacheEnabled) {
- STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
}
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index fd5da12..70bcef9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -147,7 +147,7 @@ public abstract class AbstractResetIntegrationTest {
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index 26edd69..1119818 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -375,7 +375,7 @@ public class AdjustStreamThreadCountTest {
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
addStreamStateChangeListener(kafkaStreams);
@@ -386,7 +386,32 @@ public class AdjustStreamThreadCountTest {
for (final String log : appender.getMessages()) {
// all 10 bytes should be available for remaining thread
- if (log.endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) {
+ if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10/536870912")) {
+ return;
+ }
+ }
+ }
+ }
+ fail();
+ }
+
+ @Test
+ public void shouldResizeMaxBufferAfterThreadRemovalTimesOut() throws InterruptedException {
+ final long maxBufferBytes = 10L;
+ final Properties props = new Properties();
+ props.putAll(properties);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes);
+
+ try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
+ addStreamStateChangeListener(kafkaStreams);
+ startStreamsAndWaitForRunning(kafkaStreams);
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) {
+ assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ofSeconds(0)));
+ for (final String log : appender.getMessages()) {
+ // all 10 bytes should be available for remaining thread
+ if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10485760/10")) {
return;
}
}
@@ -401,7 +426,7 @@ public class AdjustStreamThreadCountTest {
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
final AtomicBoolean injectError = new AtomicBoolean(false);
@@ -442,7 +467,63 @@ public class AdjustStreamThreadCountTest {
for (final String log : appender.getMessages()) {
// after we replace the thread there should be two remaining threads with 5 bytes each
- if (log.endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) {
+ if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) {
+ return;
+ }
+ }
+ }
+ }
+ fail();
+ }
+
+ @Test
+ public void shouldResizeMaxBufferAfterThreadReplacement() throws InterruptedException {
+ final long totalCacheBytes = 10L;
+ final Properties props = new Properties();
+ props.putAll(properties);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, totalCacheBytes);
+
+ final AtomicBoolean injectError = new AtomicBoolean(false);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(inputTopic);
+ stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
+ @Override
+ public void init(final ProcessorContext context) {
+ context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
+ if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) {
+ injectError.set(false);
+ throw new RuntimeException("BOOM");
+ }
+ });
+ }
+
+ @Override
+ public KeyValue<String, String> transform(final String key, final String value) {
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public void close() {
+ }
+ });
+
+ try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
+ addStreamStateChangeListener(kafkaStreams);
+ kafkaStreams.setUncaughtExceptionHandler(e -> StreamThreadExceptionResponse.REPLACE_THREAD);
+ startStreamsAndWaitForRunning(kafkaStreams);
+
+ stateTransitionHistory.clear();
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
+ injectError.set(true);
+ waitForCondition(() -> !injectError.get(), "StreamThread did not hit and reset the injected error");
+
+ waitForTransitionFromRebalancingToRunning();
+
+ for (final String log : appender.getMessages()) {
+ // after we replace the thread there should be two remaining threads with 5 bytes each
+ if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3495253/3 per thread.")) {
return;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
new file mode 100644
index 0000000..e0fcd4d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+@Category(IntegrationTest.class)
+public class EmitOnChangeIntegrationTest {
+
+ private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private static String inputTopic;
+ private static String inputTopic2;
+ private static String outputTopic;
+ private static String outputTopic2;
+ private static String appId = "";
+
+ @Before
+ public void setup() {
+ final String testId = safeUniqueTestName(getClass(), testName);
+ appId = "appId_" + testId;
+ inputTopic = "input" + testId;
+ inputTopic2 = "input2" + testId;
+ outputTopic = "output" + testId;
+ outputTopic2 = "output2" + testId;
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2);
+ }
+
+ @Test
+ public void shouldEmitSameRecordAfterFailover() throws Exception {
+ final Properties properties = mkObjectProperties(
+ mkMap(
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+ mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+ mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+ mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+ mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+ mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+ )
+ );
+
+ final AtomicBoolean shouldThrow = new AtomicBoolean(true);
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(inputTopic, Materialized.as("test-store"))
+ .toStream()
+ .map((key, value) -> {
+ if (shouldThrow.compareAndSet(true, false)) {
+ throw new RuntimeException("Kaboom");
+ } else {
+ return new KeyValue<>(key, value);
+ }
+ })
+ .to(outputTopic);
+ builder.stream(inputTopic2).to(outputTopic2);
+
+ try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+ kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+ StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ ),
+ outputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ )
+ );
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ ),
+ outputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ )
+ );
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 5a97579..82765f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -245,7 +245,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
@@ -322,7 +322,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -936,7 +936,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index b6aab86..9108bbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -944,7 +944,7 @@ public class EosV2UpgradeIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
new file mode 100644
index 0000000..b3c6124
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+ private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private final String testId = safeUniqueTestName(getClass(), testName);
+ private final String appId = "appId_" + testId;
+ private final Properties properties = props();
+
+ // Task 0
+ private final String inputTopic = "input" + testId;
+ private final String outputTopic = "output" + testId;
+ // Task 1
+ private final String errorInputTopic = "error-input" + testId;
+ private final String errorOutputTopic = "error-output" + testId;
+
+ @Before
+ public void setup() {
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
+ }
+
+ private Properties props() {
+ return mkObjectProperties(
+ mkMap(
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
+ mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+ mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+ mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class))
+ );
+ }
+
+ @Test
+ public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
+ final AtomicInteger noOutputExpected = new AtomicInteger(0);
+ final AtomicInteger outputExpected = new AtomicInteger(0);
+
+ try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
+ kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+
+ final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
+ builder.stream(inputTopic).peek((k, v) -> outputExpected.incrementAndGet()).to(outputTopic);
+ builder.stream(errorInputTopic)
+ .peek((k, v) -> {
+ throw new RuntimeException("Kaboom");
+ })
+ .peek((k, v) -> noOutputExpected.incrementAndGet())
+ .to(errorOutputTopic);
+
+ kafkaStreams.addNamedTopology(builder.build());
+
+ StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ errorInputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ ),
+ outputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ )
+ );
+ assertThat(noOutputExpected.get(), equalTo(0));
+ assertThat(outputExpected.get(), equalTo(2));
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index baaf06c..c94066c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -144,7 +144,7 @@ public class FineGrainedAutoResetIntegrationTest {
public void setUp() throws IOException {
final Properties props = new Properties();
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -283,7 +283,7 @@ public class FineGrainedAutoResetIntegrationTest {
@Test
public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
final Properties props = new Properties();
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 097a79f..2a1fedf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -132,7 +132,7 @@ public class GlobalKTableEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0L);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 90dc9e7..4cdd172 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -110,7 +110,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 98dec87..31658bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -119,7 +119,7 @@ public class GlobalThreadShutDownOrderTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 29c61ec..3abe088 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -104,7 +104,7 @@ public class InternalTopicIntegrationTest {
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
- streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 4fe35a6..79a6975 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -107,7 +107,7 @@ public class KStreamAggregationDedupIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e581903..3990c67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -142,7 +142,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 1e7f685..4730f5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index c83bbae..e0bed5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -181,6 +181,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
assertEquals(expectedResult, result);
}
+ @SuppressWarnings("deprecation")
private static Properties getStreamsConfig() {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 0788b52..ed48cf8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -208,7 +208,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return streamsConfig;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 6d50ea9..8d4ac2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -69,7 +69,7 @@ public class KTableSourceTopicRestartIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 9ada60f..b2c2e4d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -195,6 +195,8 @@ public class MetricsIntegrationTest {
private static final String THREAD_START_TIME = "thread-start-time";
private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
+ private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
+ private static final String CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
@@ -251,7 +253,7 @@ public class MetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
}
@@ -527,6 +529,8 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricTask, PUNCTUATE_TOTAL, 4);
checkMetricByName(listMetricTask, PROCESS_RATE, 4);
checkMetricByName(listMetricTask, PROCESS_TOTAL, 4);
+ checkMetricByName(listMetricTask, INPUT_BUFFER_BYTES_TOTAL, 4);
+ checkMetricByName(listMetricTask, CACHE_SIZE_BYTES_TOTAL, 3);
}
private void checkProcessorNodeLevelMetrics() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 0e367d9..75bf3a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -813,7 +813,7 @@ public class NamedTopologyIntegrationTest {
try {
final AtomicInteger noOutputExpected = new AtomicInteger(0);
final AtomicInteger outputExpected = new AtomicInteger(0);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 44744cd..f9ab66c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -198,7 +198,7 @@ public class OptimizedKTableIntegrationTest {
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
- config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 15b9ea6..52a8966 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -964,7 +964,7 @@ public class QueryableStateIntegrationTest {
}
private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSizeBytes);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1bfb1b6..99cb358 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -129,7 +129,7 @@ public class RegexSourceIntegrationTest {
public void setUp() throws InterruptedException {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
final Properties properties = new Properties();
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 2c0e180..b5aad95a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -123,7 +123,7 @@ public class RestoreIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index c698d06..725b386 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -198,7 +198,7 @@ public class RocksDBMetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
return streamsConfiguration;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 4fbe734..c8da3bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -400,7 +400,7 @@ public class StandbyTaskEOSIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 6c6fc5d..17df2b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -94,7 +94,7 @@ public class StoreUpgradeIntegrationTest {
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index 512d1c1..8d023b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -121,7 +121,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 0974ed6..49e7913 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -51,7 +51,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked"})
public class KTableFilterTest {
private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
@@ -59,7 +59,7 @@ public class KTableFilterTest {
@Before
public void setUp() {
// disable caching at the config level
- props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+ props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
}
private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 8c7d179..a77dcdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -70,7 +70,7 @@ public class SessionWindowedKStreamImplTest {
@Test
public void shouldCountSessionWindowedWithCachingDisabled() {
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
shouldCountSessionWindowed();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index f185f1a..2b273ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -941,9 +941,22 @@ public class InternalTopologyBuilderTest {
}
@Test
+ @SuppressWarnings("deprecation")
+ public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() {
+ final Properties globalProps = StreamsTestUtils.getStreamsConfig();
+ globalProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 200L);
+ globalProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 100L);
+ final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps);
+ final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig);
+ assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties())));
+ assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
final Properties topologyOverrides = new Properties();
- topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
+ topologyOverrides.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
@@ -969,9 +982,10 @@ public class InternalTopologyBuilderTest {
}
@Test
+ @SuppressWarnings("deprecation")
public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
- streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
+ streamsProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 40602b5..389d0d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -19,9 +19,12 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -44,6 +47,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
+import java.util.Collections;
+import java.util.Optional;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -83,6 +88,7 @@ public class PartitionGroupTest {
private final Metrics metrics = new Metrics();
private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString());
private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap());
+ private final MetricName totalBytesValue = new MetricName("total-bytes-last-value", "", "", mkMap());
private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) {
@@ -484,6 +490,7 @@ public class PartitionGroupTest {
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
@@ -517,6 +524,7 @@ public class PartitionGroupTest {
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
@@ -552,6 +560,7 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
StreamsConfig.MAX_TASK_IDLE_MS_DISABLED
);
@@ -590,6 +599,7 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@@ -629,6 +639,7 @@ public class PartitionGroupTest {
),
tp -> lags.getOrDefault(tp, OptionalLong.empty()),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@@ -665,6 +676,7 @@ public class PartitionGroupTest {
),
tp -> lags.getOrDefault(tp, OptionalLong.empty()),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@@ -701,6 +713,7 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
+ getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
1L
);
@@ -763,6 +776,93 @@ public class PartitionGroupTest {
}
}
+ @Test
+ public void shouldUpdateTotalBytesBufferedOnRecordsAdditionAndConsumption() {
+ final PartitionGroup group = getBasicGroup();
+
+ assertEquals(0, group.numBuffered());
+ assertEquals(0L, group.totalBytesBuffered());
+
+ // add three 3 records with timestamp 1, 5, 3 to partition-1
+ final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 5L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 3L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
+
+ long partition1TotalBytes = getBytesBufferedForRawRecords(list1);
+ group.addRawRecords(partition1, list1);
+
+ verifyBuffered(3, 3, 0, group);
+ assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
+ assertEquals(-1L, group.streamTime());
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+ assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
+
+ StampedRecord record;
+ final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+
+ // get first two records from partition 1
+ record = group.nextRecord(info, time.milliseconds());
+ assertEquals(record.timestamp, 1L);
+ record = group.nextRecord(info, time.milliseconds());
+ assertEquals(record.timestamp, 5L);
+
+ partition1TotalBytes -= getBytesBufferedForRawRecords(Arrays.asList(list1.get(0), list1.get(0)));
+ assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
+ assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
+
+ // add three 3 records with timestamp 2, 4, 6 to partition-2
+ final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+ new ConsumerRecord<>("topic", 2, 2L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 2, 4L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 2, 6L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
+
+ long partition2TotalBytes = getBytesBufferedForRawRecords(list2);
+ group.addRawRecords(partition2, list2);
+ // 1:[3]
+ // 2:[2, 4, 6]
+ assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
+ assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
+
+ // get one record, next record should be ts=2 from partition 2
+ record = group.nextRecord(info, time.milliseconds());
+ // 1:[3]
+ // 2:[4, 6]
+ partition2TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(0)));
+ assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
+ assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
+ assertEquals(record.timestamp, 2L);
+
+ // get one record, next up should have ts=3 from partition 1 (even though it has seen a larger max timestamp =5)
+ record = group.nextRecord(info, time.milliseconds());
+ // 1:[]
+ // 2:[4, 6]
+ partition1TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(2)));
+ assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
+ assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
+ assertEquals(record.timestamp, 3L);
+ }
+
+ private long getBytesBufferedForRawRecords(final List<ConsumerRecord<byte[], byte[]>> rawRecords) {
+ long rawRecordsSizeInBytes = 0L;
+ for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+ long headerSizeInBytes = 0L;
+
+ for (final Header header: rawRecord.headers().toArray()) {
+ headerSizeInBytes += header.key().getBytes().length + header.value().length;
+ }
+
+ rawRecordsSizeInBytes += rawRecord.serializedKeySize() +
+ rawRecord.serializedValueSize() +
+ 8L + // timestamp
+ 8L + // offset
+ rawRecord.topic().getBytes().length +
+ 4L + // partition
+ headerSizeInBytes;
+ }
+ return rawRecordsSizeInBytes;
+ }
+
private PartitionGroup getBasicGroup() {
return new PartitionGroup(
logContext,
@@ -773,6 +873,7 @@ public class PartitionGroupTest {
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
enforcedProcessingSensor,
+ getValueSensor(metrics, totalBytesValue),
maxTaskIdleMs
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 251a263..7d0e716 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -110,7 +110,7 @@ public class RepartitionOptimizingTest {
@Before
public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
+ streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
processorValueCollector.clear();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index b388a6a..bc04505 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -86,7 +86,7 @@ public class RepartitionWithMergeOptimizingTest {
@Before
public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
+ streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9a66f27..b6bb3b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -108,7 +108,6 @@ public class StandbyTaskTest {
return new StreamsConfig(mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
- mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName())
)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 71537ae..be99d73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -230,6 +230,7 @@ public class StreamTaskTest {
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
}
+ @SuppressWarnings("deprecation")
private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c0cf9a2..af02e55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -122,6 +122,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
@@ -161,6 +162,7 @@ public class StreamThreadTest {
private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true, false);
private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+ private final long defaultMaxBufferSizeInBytes = 512 * 1024 * 1024;
private StreamsMetadataState streamsMetadataState;
private final static BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
@@ -200,7 +202,6 @@ public class StreamThreadTest {
return mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
- mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE),
@@ -251,6 +252,7 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
+ defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@@ -528,6 +530,7 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
+ defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@@ -1200,7 +1203,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
final StreamsException thrown = assertThrows(StreamsException.class, thread::run);
@@ -1587,6 +1591,7 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
+ defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@@ -2237,7 +2242,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2304,7 +2310,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2379,7 +2386,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2449,7 +2457,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2517,7 +2526,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2671,6 +2681,205 @@ public class StreamThreadTest {
}
@Test
+ public void shouldPauseNonEmptyPartitionsWhenTotalBufferSizeExceedsMaxBufferSize() {
+ final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+ final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+ expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+ expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+ final List<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
+ consumer.assign(assignedPartitions);
+ records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
+ t1p1.topic(),
+ t1p1.partition(),
+ 1,
+ mockTime.milliseconds(),
+ TimestampType.CREATE_TIME,
+ 2,
+ 6,
+ new byte[2],
+ new byte[6],
+ new RecordHeaders(),
+ Optional.empty())));
+ expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records)).anyTimes();
+ EasyMock.replay(consumer, consumerGroupMetadata);
+ final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+
+ final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
+ final Metric testMetric = new KafkaMetric(
+ new Object(),
+ testMetricName,
+ (Measurable) (config, now) -> 0,
+ null,
+ new MockTime());
+ final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
+
+ expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
+ EasyMock.replay(taskManager);
+
+ final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+ final StreamThread thread = new StreamThread(
+ mockTime,
+ config,
+ null,
+ consumer,
+ consumer,
+ changelogReader,
+ null,
+ taskManager,
+ streamsMetrics,
+ topologyMetadata,
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
+ null,
+ HANDLER,
+ null,
+ 10
+ );
+ thread.setState(StreamThread.State.STARTING);
+ thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
+ thread.pollPhase();
+ thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+ thread.pollPhase();
+ EasyMock.reset(consumer);
+ consumer.pause(anyObject());
+ // Consumer.pause should be called only once, when we added the second record.
+ EasyMock.expectLastCall().times(1);
+ }
+
+ @Test
+ public void shouldResumePartitionsAfterConsumptionWhenTotalBufferSizeIsLTEMaxBufferSize() {
+ final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+ final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+ final ChangelogReader changelogReader = EasyMock.createNiceMock(ChangelogReader.class);
+ expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+ expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+ changelogReader.restore(anyObject());
+ expectLastCall().andVoid();
+
+ final Task task1 = mock(Task.class);
+ final Task task2 = mock(Task.class);
+
+ final TaskId taskId1 = new TaskId(0, 1);
+ final TaskId taskId2 = new TaskId(0, 2);
+
+ expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
+ expect(task1.id()).andReturn(taskId1).anyTimes();
+ expect(task1.inputPartitions()).andReturn(mkSet(t1p1)).anyTimes();
+ expect(task1.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
+ expect(task1.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
+ expect(task1.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
+
+ expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
+ expect(task2.id()).andReturn(taskId2).anyTimes();
+ expect(task2.inputPartitions()).andReturn(mkSet(t1p2)).anyTimes();
+ expect(task2.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
+ expect(task2.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
+ expect(task2.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
+ EasyMock.replay(task1, task2);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+ records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
+ t1p1.topic(),
+ t1p1.partition(),
+ 1,
+ mockTime.milliseconds(),
+ TimestampType.CREATE_TIME,
+ 2,
+ 6,
+ new byte[2],
+ new byte[6],
+ new RecordHeaders(),
+ Optional.empty())));
+ records.put(t1p2, Collections.singletonList(new ConsumerRecord<>(
+ t1p2.topic(),
+ t1p2.partition(),
+ 1,
+ mockTime.milliseconds(),
+ TimestampType.CREATE_TIME,
+ 2,
+ 6,
+ new byte[2],
+ new byte[6],
+ new RecordHeaders(),
+ Optional.empty())));
+
+ final List<TopicPartition> assignedPartitions = Arrays.asList(t1p1, t1p2);
+ consumer.assign(assignedPartitions);
+ expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records));
+ EasyMock.replay(consumer, consumerGroupMetadata);
+
+ final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+
+ final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
+ final Metric testMetric = new KafkaMetric(
+ new Object(),
+ testMetricName,
+ (Measurable) (config, now) -> 0,
+ null,
+ new MockTime());
+ final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+ expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
+ expect(taskManager.activeTaskMap()).andReturn(mkMap(
+ mkEntry(taskId1, task1),
+ mkEntry(taskId2, task2)
+ ));
+ expect(taskManager.tasks()).andStubReturn(mkMap(
+ mkEntry(taskId1, task1),
+ mkEntry(taskId2, task2)
+ ));
+ expect(taskManager.standbyTaskMap()).andReturn(new HashMap<>());
+ expect(taskManager.commit(anyObject())).andReturn(0);
+ expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
+ expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
+ expect(taskManager.process(anyInt(), anyObject())).andReturn(0);
+
+ EasyMock.replay(taskManager);
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+ final StreamThread thread = new StreamThread(
+ mockTime,
+ new StreamsConfig(configProps(true)),
+ null,
+ consumer,
+ consumer,
+ changelogReader,
+ null,
+ taskManager,
+ streamsMetrics,
+ new TopologyMetadata(internalTopologyBuilder, config),
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
+ null,
+ HANDLER,
+ null,
+ 6
+ ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ thread.setState(StreamThread.State.STARTING);
+ thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
+ thread.setState(StreamThread.State.RUNNING);
+
+ thread.runOnce();
+ EasyMock.reset(consumer);
+ consumer.resume(anyObject());
+ // Consumer.resume should be called only once, when we added the second record.
+ EasyMock.expectLastCall().times(1);
+ }
+
+ @Test
public void shouldTransmitTaskManagerMetrics() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
@@ -2738,7 +2947,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
);
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
final Metric testMetric = new KafkaMetric(
@@ -2795,7 +3005,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
(e, b) -> { },
- null
+ null,
+ defaultMaxBufferSizeInBytes
) {
@Override
void runOnce() {
@@ -2930,7 +3141,8 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
- null
+ null,
+ defaultMaxBufferSizeInBytes
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
index 1d33fea..cababb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
@@ -85,6 +85,47 @@ public class TaskMetricsTest {
}
@Test
+ public void shouldGetTotalBytesSensor() {
+ final String operation = "input-buffer-bytes-total";
+ when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO))
+ .thenReturn(expectedSensor);
+ final String totalBytesDescription = "The total number of bytes accumulated in this task's input buffer";
+ when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);
+ StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ TASK_LEVEL_GROUP,
+ tagMap,
+ operation,
+ totalBytesDescription
+ );
+
+
+ final Sensor sensor = TaskMetrics.totalBytesSensor(THREAD_ID, TASK_ID, streamsMetrics);
+
+ assertThat(sensor, is(expectedSensor));
+ }
+
+ @Test
+ public void shouldGetTotalCacheSizeInBytesSensor() {
+ final String operation = "cache-size-bytes-total";
+ when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO))
+ .thenReturn(expectedSensor);
+ final String totalBytesDescription = "The total size in bytes of this task's cache.";
+ when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);
+ StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ TASK_LEVEL_GROUP,
+ tagMap,
+ operation,
+ totalBytesDescription
+ );
+
+ final Sensor sensor = TaskMetrics.totalCacheSizeBytesSensor(THREAD_ID, TASK_ID, streamsMetrics);
+
+ assertThat(sensor, is(expectedSensor));
+ }
+
+ @Test
public void shouldGetProcessLatencySensor() {
final String operation = "process-latency";
when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.DEBUG))
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 6d43b4c..4d2022c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -32,6 +33,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -43,11 +48,12 @@ public class NamedCacheTest {
private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private NamedCache cache;
+ private final Metrics innerMetrics = new Metrics();
+ private final StreamsMetricsImpl metrics = new MockStreamsMetrics(innerMetrics);
+ private final MetricName cacheSizeBytesTotal = new MetricName("cache-size-bytes-total", "stream-task-metrics", "", mkMap(mkEntry("thread-id", "Test worker"), mkEntry("task-id", "dummy")));
@Before
public void setUp() {
- final Metrics innerMetrics = new Metrics();
- final StreamsMetricsImpl metrics = new MockStreamsMetrics(innerMetrics);
cache = new NamedCache("dummy-name", metrics);
}
@@ -82,6 +88,7 @@ public class NamedCacheTest {
cache.put(Bytes.wrap(new byte[]{1}), value);
cache.put(Bytes.wrap(new byte[]{2}), value);
final long size = cache.sizeInBytes();
+ assertThat(metrics.metrics().get(cacheSizeBytesTotal).metricValue(), is((double) size));
// 1 byte key + 24 bytes overhead
assertEquals((value.size() + 25) * 3, size);
}
@@ -114,6 +121,7 @@ public class NamedCacheTest {
final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
assertArrayEquals(new byte[] {10}, deleted.value());
assertEquals(0, cache.sizeInBytes());
+ assertThat(metrics.metrics().get(cacheSizeBytesTotal).metricValue(), is((double) 0));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 8a402ab..1e06b4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -72,7 +72,7 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
- streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
final int timeout = 6000;
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 3b1aa44..ae9d752 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -106,7 +106,7 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis());
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index b98f861..af3614c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -87,7 +87,7 @@ public class StreamsNamedRepartitionTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
- config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+ config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index 714aa11..95945b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -110,7 +110,7 @@ public class StreamsOptimizedTest {
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest");
- config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+ config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), "100");
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 3c693cc..2568b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -67,7 +67,7 @@ public class StreamsStandByReplicaTest {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
- streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index dba1cbc..d438a44 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -155,7 +155,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
*
* <p> Note that the {@code TopologyTestDriver} processes input records synchronously.
* This implies that {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit.interval.ms} and
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache.max.bytes.buffering} configuration have no effect.
+ * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache.max.bytes.buffering} configuration have no effect.
* The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen
* after each input record.
*
@@ -309,6 +309,7 @@ public class TopologyTestDriver implements Closeable {
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
+ @SuppressWarnings({"unchecked", "deprecation"})
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
@@ -329,7 +330,7 @@ public class TopologyTestDriver implements Closeable {
final ThreadCache cache = new ThreadCache(
logContext,
- Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+ Math.max(0, streamsConfig.getTotalCacheSize()),
streamsMetrics
);