You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/06/03 18:03:07 UTC
[kafka] branch trunk updated: KAFKA-12749: Changelog topic config
on suppressed KTable lost (#10664)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 93dca8e KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
93dca8e is described below
commit 93dca8ebd973f7a49280aa216137f685c00030e3
Author: Viswanathan Ranganathan <vi...@gmail.com>
AuthorDate: Thu Jun 3 11:00:19 2021 -0700
KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
Refactored logConfig to be passed appropriately when using shutDownWhenFull or emitEarlyWhenFull. Removed the constructor that doesn't accept a logConfig parameter so you're forced to specify it explicitly, whether it's empty/unspecified or not.
Co-authored-by: Bruno Cadonna <ca...@apache.org>
Reviewers: Walker Carlson <wc...@confluent.io>, Bruno Cadonna <ca...@apache.org>
---
.../apache/kafka/streams/kstream/Suppressed.java | 5 +-
.../internals/suppress/BufferConfigInternal.java | 7 +-
.../internals/suppress/EagerBufferConfigImpl.java | 22 +++--
.../internals/suppress/StrictBufferConfigImpl.java | 21 ++---
.../kafka/streams/kstream/SuppressedTest.java | 93 ++++++++++++++++++++--
5 files changed, 111 insertions(+), 37 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 2f96993..31a53ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import java.time.Duration;
+import java.util.Collections;
import java.util.Map;
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
@@ -48,7 +49,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
*/
static EagerBufferConfig maxRecords(final long recordLimit) {
- return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+ return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE, Collections.emptyMap());
}
/**
@@ -60,7 +61,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
*/
static EagerBufferConfig maxBytes(final long byteLimit) {
- return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+ return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit, Collections.emptyMap());
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 74de6ef..800a2a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -35,18 +35,19 @@ public abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC
return new StrictBufferConfigImpl(
Long.MAX_VALUE,
Long.MAX_VALUE,
- SHUT_DOWN // doesn't matter, given the bounds
+ SHUT_DOWN, // doesn't matter, given the bounds
+ getLogConfig()
);
}
@Override
public Suppressed.StrictBufferConfig shutDownWhenFull() {
- return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
+ return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, getLogConfig());
}
@Override
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
- return new EagerBufferConfigImpl(maxRecords(), maxBytes());
+ return new EagerBufferConfigImpl(maxRecords(), maxBytes(), getLogConfig());
}
public abstract boolean isLoggingEnabled();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index c56532d..7665e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -28,15 +28,9 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
private final long maxBytes;
private final Map<String, String> logConfig;
- public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
- this.maxRecords = maxRecords;
- this.maxBytes = maxBytes;
- this.logConfig = Collections.emptyMap();
- }
-
- private EagerBufferConfigImpl(final long maxRecords,
- final long maxBytes,
- final Map<String, String> logConfig) {
+ public EagerBufferConfigImpl(final long maxRecords,
+ final long maxBytes,
+ final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = logConfig;
@@ -97,16 +91,20 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
}
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
- maxBytes == that.maxBytes;
+ maxBytes == that.maxBytes &&
+ Objects.equals(getLogConfig(), that.getLogConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes);
+ return Objects.hash(maxRecords, maxBytes, getLogConfig());
}
@Override
public String toString() {
- return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ", maxBytes=" + maxBytes + '}';
+ return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
+ ", maxBytes=" + maxBytes +
+ ", logConfig=" + getLogConfig() +
+ "}";
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 13ffccd..2ca5ef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -41,14 +41,6 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
this.logConfig = logConfig;
}
- public StrictBufferConfigImpl(final long maxRecords,
- final long maxBytes,
- final BufferFullStrategy bufferFullStrategy) {
- this.maxRecords = maxRecords;
- this.maxBytes = maxBytes;
- this.bufferFullStrategy = bufferFullStrategy;
- this.logConfig = Collections.emptyMap();
- }
public StrictBufferConfigImpl() {
this.maxRecords = Long.MAX_VALUE;
@@ -59,12 +51,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
- return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
+ return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, getLogConfig());
}
@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
- return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy);
+ return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, getLogConfig());
}
@Override
@@ -113,18 +105,21 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
- bufferFullStrategy == that.bufferFullStrategy;
+ bufferFullStrategy == that.bufferFullStrategy &&
+ Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) o).getLogConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
+ return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, getLogConfig());
}
@Override
public String toString() {
return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
", maxBytes=" + maxBytes +
- ", bufferFullStrategy=" + bufferFullStrategy + '}';
+ ", bufferFullStrategy=" + bufferFullStrategy +
+ ", logConfig=" + getLogConfig().toString() +
+ '}';
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 112b9eb..b799884 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.junit.Test;
+import java.util.Collections;
+
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
@@ -46,13 +48,19 @@ public class SuppressedTest {
assertThat(
"keys alone should be set",
maxRecords(2L),
- is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+ is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap()))
);
assertThat(
"size alone should be set",
maxBytes(2L),
- is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
+ is(new EagerBufferConfigImpl(MAX_VALUE, 2L, Collections.emptyMap()))
+ );
+
+ assertThat(
+ "config should be set even after max records",
+ maxRecords(2L).withMaxBytes(4L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")),
+ is(new EagerBufferConfigImpl(2L, 4L, Collections.singletonMap("myConfigKey", "myConfigValue")))
);
}
@@ -91,7 +99,13 @@ public class SuppressedTest {
assertThat(
"all constraints should be set",
untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
- is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false))
+ is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L, Collections.emptyMap()), null, false))
+ );
+
+ assertThat(
+ "config is not lost early emit is set",
+ untilTimeLimit(ofMillis(2), maxRecords(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).emitEarlyWhenFull()),
+ is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.singletonMap("myConfigKey", "myConfigValue")), null, false))
);
}
@@ -105,13 +119,13 @@ public class SuppressedTest {
assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
- is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);
assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
- is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);
@@ -122,14 +136,79 @@ public class SuppressedTest {
assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()).withName("name"),
- is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);
assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()).withName("name"),
- is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);
+
+ assertThat(
+ "config is not lost when shutdown when full is set",
+ untilWindowCloses(maxBytes(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).shutDownWhenFull()),
+ is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue"))))
+ );
+ }
+
+ @Test
+ public void supportLongChainOfMethods() {
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfig = unbounded()
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L);
+
+ assertThat(
+ "long chain of eager buffer config sets attributes properly",
+ bufferConfig,
+ is(new EagerBufferConfigImpl(5L, 6L, Collections.emptyMap()))
+ );
+ assertThat(
+ "long chain of strict buffer config sets attributes properly",
+ bufferConfig.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.emptyMap()))
+ );
+
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLogging = unbounded()
+ .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"))
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L);
+
+ assertThat(
+ "long chain of eager buffer config sets attributes properly with logging enabled",
+ bufferConfigWithLogging,
+ is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+ assertThat(
+ "long chain of strict buffer config sets attributes properly with logging enabled",
+ bufferConfigWithLogging.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLoggingCalledAtTheEnd = unbounded()
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L)
+ .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"));
+
+ assertThat(
+ "long chain of eager buffer config sets logging even after other setters",
+ bufferConfigWithLoggingCalledAtTheEnd,
+ is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+ assertThat(
+ "long chain of strict buffer config sets logging even after other setters",
+ bufferConfigWithLoggingCalledAtTheEnd.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
}
}