You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/06/03 18:03:07 UTC

[kafka] branch trunk updated: KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93dca8e  KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
93dca8e is described below

commit 93dca8ebd973f7a49280aa216137f685c00030e3
Author: Viswanathan Ranganathan <vi...@gmail.com>
AuthorDate: Thu Jun 3 11:00:19 2021 -0700

    KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
    
    Refactored logConfig to be passed appropriately when using shutDownWhenFull or emitEarlyWhenFull. Removed the constructor that doesn't accept a logConfig parameter so you're forced to specify it explicitly, whether it's empty/unspecified or not.
    
    Co-authored-by: Bruno Cadonna <ca...@apache.org>
    
    Reviewers: Walker Carlson <wc...@confluent.io>, Bruno Cadonna <ca...@apache.org>
---
 .../apache/kafka/streams/kstream/Suppressed.java   |  5 +-
 .../internals/suppress/BufferConfigInternal.java   |  7 +-
 .../internals/suppress/EagerBufferConfigImpl.java  | 22 +++--
 .../internals/suppress/StrictBufferConfigImpl.java | 21 ++---
 .../kafka/streams/kstream/SuppressedTest.java      | 93 ++++++++++++++++++++--
 5 files changed, 111 insertions(+), 37 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 2f96993..31a53ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.Map;
 
 public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
@@ -48,7 +49,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
          * Create a size-constrained buffer in terms of the maximum number of keys it will store.
          */
         static EagerBufferConfig maxRecords(final long recordLimit) {
-            return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+            return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE, Collections.emptyMap());
         }
 
         /**
@@ -60,7 +61,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
          * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
          */
         static EagerBufferConfig maxBytes(final long byteLimit) {
-            return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+            return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit, Collections.emptyMap());
         }
 
         /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 74de6ef..800a2a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -35,18 +35,19 @@ public abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC
         return new StrictBufferConfigImpl(
             Long.MAX_VALUE,
             Long.MAX_VALUE,
-            SHUT_DOWN // doesn't matter, given the bounds
+            SHUT_DOWN, // doesn't matter, given the bounds
+            getLogConfig()
         );
     }
 
     @Override
     public Suppressed.StrictBufferConfig shutDownWhenFull() {
-        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
+        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, getLogConfig());
     }
 
     @Override
     public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
-        return new EagerBufferConfigImpl(maxRecords(), maxBytes());
+        return new EagerBufferConfigImpl(maxRecords(), maxBytes(), getLogConfig());
     }
 
     public abstract boolean isLoggingEnabled();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index c56532d..7665e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -28,15 +28,9 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
     private final long maxBytes;
     private final Map<String, String> logConfig;
 
-    public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
-        this.maxRecords = maxRecords;
-        this.maxBytes = maxBytes;
-        this.logConfig = Collections.emptyMap();
-    }
-
-    private EagerBufferConfigImpl(final long maxRecords,
-                                  final long maxBytes,
-                                  final Map<String, String> logConfig) {
+    public EagerBufferConfigImpl(final long maxRecords,
+                                 final long maxBytes,
+                                 final Map<String, String> logConfig) {
         this.maxRecords = maxRecords;
         this.maxBytes = maxBytes;
         this.logConfig = logConfig;
@@ -97,16 +91,20 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
         }
         final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
         return maxRecords == that.maxRecords &&
-            maxBytes == that.maxBytes;
+            maxBytes == that.maxBytes &&
+            Objects.equals(getLogConfig(), that.getLogConfig());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxRecords, maxBytes);
+        return Objects.hash(maxRecords, maxBytes, getLogConfig());
     }
 
     @Override
     public String toString() {
-        return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ", maxBytes=" + maxBytes + '}';
+        return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
+                ", maxBytes=" + maxBytes +
+                ", logConfig=" + getLogConfig() +
+                "}";
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 13ffccd..2ca5ef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -41,14 +41,6 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
         this.logConfig = logConfig;
     }
 
-    public StrictBufferConfigImpl(final long maxRecords,
-                                  final long maxBytes,
-                                  final BufferFullStrategy bufferFullStrategy) {
-        this.maxRecords = maxRecords;
-        this.maxBytes = maxBytes;
-        this.bufferFullStrategy = bufferFullStrategy;
-        this.logConfig = Collections.emptyMap();
-    }
 
     public StrictBufferConfigImpl() {
         this.maxRecords = Long.MAX_VALUE;
@@ -59,12 +51,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
 
     @Override
     public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
-        return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
+        return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, getLogConfig());
     }
 
     @Override
     public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
-        return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy);
+        return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, getLogConfig());
     }
 
     @Override
@@ -113,18 +105,21 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
         final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
         return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
-            bufferFullStrategy == that.bufferFullStrategy;
+            bufferFullStrategy == that.bufferFullStrategy &&
+            Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) o).getLogConfig());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
+        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, getLogConfig());
     }
 
     @Override
     public String toString() {
         return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
             ", maxBytes=" + maxBytes +
-            ", bufferFullStrategy=" + bufferFullStrategy + '}';
+            ", bufferFullStrategy=" + bufferFullStrategy +
+            ", logConfig=" + getLogConfig().toString() +
+             '}';
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 112b9eb..b799884 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.junit.Test;
 
+import java.util.Collections;
+
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
@@ -46,13 +48,19 @@ public class SuppressedTest {
         assertThat(
             "keys alone should be set",
             maxRecords(2L),
-            is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+            is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap()))
         );
 
         assertThat(
             "size alone should be set",
             maxBytes(2L),
-            is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
+            is(new EagerBufferConfigImpl(MAX_VALUE, 2L, Collections.emptyMap()))
+        );
+
+        assertThat(
+            "config should be set even after max records",
+            maxRecords(2L).withMaxBytes(4L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")),
+            is(new EagerBufferConfigImpl(2L, 4L, Collections.singletonMap("myConfigKey", "myConfigValue")))
         );
     }
 
@@ -91,7 +99,13 @@ public class SuppressedTest {
         assertThat(
             "all constraints should be set",
             untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
-            is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false))
+            is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L, Collections.emptyMap()), null, false))
+        );
+
+        assertThat(
+            "config is not lost early emit is set",
+            untilTimeLimit(ofMillis(2), maxRecords(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).emitEarlyWhenFull()),
+            is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.singletonMap("myConfigKey", "myConfigValue")), null, false))
         );
     }
 
@@ -105,13 +119,13 @@ public class SuppressedTest {
 
         assertThat(
             untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
-            is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+            is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
             )
         );
 
         assertThat(
             untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
-            is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+            is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
             )
         );
 
@@ -122,14 +136,79 @@ public class SuppressedTest {
 
         assertThat(
             untilWindowCloses(maxRecords(2L).shutDownWhenFull()).withName("name"),
-            is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+            is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
             )
         );
 
         assertThat(
             untilWindowCloses(maxBytes(2L).shutDownWhenFull()).withName("name"),
-            is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+            is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
             )
         );
+
+        assertThat(
+            "config is not lost when shutdown when full is set",
+            untilWindowCloses(maxBytes(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).shutDownWhenFull()),
+            is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue"))))
+        );
+    }
+
+    @Test
+    public void supportLongChainOfMethods() {
+        final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfig = unbounded()
+            .emitEarlyWhenFull()
+            .withMaxRecords(3L)
+            .withMaxBytes(4L)
+            .withMaxRecords(5L)
+            .withMaxBytes(6L);
+
+        assertThat(
+            "long chain of eager buffer config sets attributes properly",
+            bufferConfig,
+            is(new EagerBufferConfigImpl(5L, 6L, Collections.emptyMap()))
+        );
+        assertThat(
+            "long chain of strict buffer config sets attributes properly",
+            bufferConfig.shutDownWhenFull(),
+            is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.emptyMap()))
+        );
+
+        final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLogging = unbounded()
+            .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"))
+            .emitEarlyWhenFull()
+            .withMaxRecords(3L)
+            .withMaxBytes(4L)
+            .withMaxRecords(5L)
+            .withMaxBytes(6L);
+
+        assertThat(
+            "long chain of eager buffer config sets attributes properly with logging enabled",
+            bufferConfigWithLogging,
+            is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
+        );
+        assertThat(
+            "long chain of strict buffer config sets attributes properly with logging enabled",
+            bufferConfigWithLogging.shutDownWhenFull(),
+            is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
+        );
+
+        final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLoggingCalledAtTheEnd = unbounded()
+            .emitEarlyWhenFull()
+            .withMaxRecords(3L)
+            .withMaxBytes(4L)
+            .withMaxRecords(5L)
+            .withMaxBytes(6L)
+            .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"));
+
+        assertThat(
+            "long chain of eager buffer config sets logging even after other setters",
+                bufferConfigWithLoggingCalledAtTheEnd,
+            is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
+        );
+        assertThat(
+            "long chain of strict buffer config sets logging even after other setters",
+                bufferConfigWithLoggingCalledAtTheEnd.shutDownWhenFull(),
+            is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
+        );
     }
 }