You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/15 23:27:30 UTC
[kafka] branch trunk updated: KAFKA-7778: Add KTable.suppress to
Scala API (#6314)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b7d7f75 KAFKA-7778: Add KTable.suppress to Scala API (#6314)
b7d7f75 is described below
commit b7d7f7590de405d8d527c32ca9f067e073e9d83b
Author: Casey Green <gr...@gmail.com>
AuthorDate: Mon Apr 15 18:27:19 2019 -0500
KAFKA-7778: Add KTable.suppress to Scala API (#6314)
Detailed description
* Adds KTable.suppress to the Scala API.
* Fixed count in KGroupedStream, SessionWindowedKStream, and TimeWindowedKStream so that the value serde gets passed down to the KTable returned by the internal mapValues method.
* Suppress API support for Java 1.8 + Scala 2.11
Testing strategy
I added unit tests covering:
* Windowed KTable.count.suppress w/ Suppressed.untilTimeLimit
* Windowed KTable.count.suppress w/ Suppressed.untilWindowCloses
* Non-windowed KTable.count.suppress w/ Suppressed.untilTimeLimit
* Session-windowed KTable.count.suppress w/ Suppressed.untilWindowCloses
Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/kstream/Suppressed.java | 15 +-
.../internals/suppress/BufferConfigInternal.java | 2 +-
.../internals/suppress/EagerBufferConfigImpl.java | 6 +-
.../streams/scala/kstream/KGroupedStream.scala | 11 +-
.../kafka/streams/scala/kstream/KTable.scala | 12 ++
.../scala/kstream/SessionWindowedKStream.scala | 11 +-
.../kafka/streams/scala/kstream/Suppressed.scala | 128 ++++++++++++
.../scala/kstream/TimeWindowedKStream.scala | 11 +-
.../kafka/streams/scala/kstream/KTableTest.scala | 224 +++++++++++++++++++++
.../streams/scala/kstream/SuppressedTest.scala | 96 +++++++++
10 files changed, 500 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index b5d7937..c80eaec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -33,11 +33,20 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
}
+ /**
+ * Marker interface for a buffer configuration that will strictly enforce size constraints
+ * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
+ * results downstream, but does not promise to eliminate them entirely.
+ */
+ interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {
+
+ }
+
interface BufferConfig<BC extends BufferConfig<BC>> {
/**
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
*/
- static BufferConfig<?> maxRecords(final long recordLimit) {
+ static EagerBufferConfig maxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
}
@@ -49,7 +58,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
/**
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
*/
- static BufferConfig<?> maxBytes(final long byteLimit) {
+ static EagerBufferConfig maxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
}
@@ -108,7 +117,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
* This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
* duplicate results downstream, but does not promise to eliminate them.
*/
- BufferConfig emitEarlyWhenFull();
+ EagerBufferConfig emitEarlyWhenFull();
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 10675ef..2087945 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -43,7 +43,7 @@ abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> impl
}
@Override
- public Suppressed.BufferConfig emitEarlyWhenFull() {
+ public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
return new EagerBufferConfigImpl(maxRecords(), maxBytes());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index e94abc1..1c1b30c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -20,7 +20,7 @@ import org.apache.kafka.streams.kstream.Suppressed;
import java.util.Objects;
-public class EagerBufferConfigImpl extends BufferConfigInternal {
+public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.EagerBufferConfig> implements Suppressed.EagerBufferConfig {
private final long maxRecords;
private final long maxBytes;
@@ -31,12 +31,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
}
@Override
- public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
+ public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, maxBytes);
}
@Override
- public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
+ public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(maxRecords, byteLimit);
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 5168805..98fb12b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -20,7 +20,8 @@
package org.apache.kafka.streams.scala
package kstream
-import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KTable => KTableJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
@@ -46,9 +47,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
*/
def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
- val c: KTable[K, java.lang.Long] =
+ val javaCountTable: KTableJ[K, java.lang.Long] =
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
- c.mapValues[Long](Long2long _)
+ val tableImpl = javaCountTable.asInstanceOf[KTableImpl[K, ByteArrayKeyValueStore, java.lang.Long]]
+ javaCountTable.mapValues[Long](
+ ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+ Materialized.`with`[K, Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
+ )
}
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 9ac27ee..20ee08b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -160,6 +160,18 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
inner.toStream[KR](mapper.asKeyValueMapper)
/**
+ * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
+ *
+ * This controls what updates downstream table and stream operations will receive.
+ *
+ * @param suppressed Configuration object determining what, if any, updates to suppress.
+ * @return A new KTable with the desired suppression characteristics.
+ * @see `org.apache.kafka.streams.kstream.KTable#suppress`
+ */
+ def suppress(suppressed: Suppressed[_ >: K]): KTable[K, V] =
+ inner.suppress(suppressed)
+
+ /**
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 6571df9..a3e8ae2 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -20,7 +20,8 @@
package org.apache.kafka.streams.scala
package kstream
-import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
@@ -60,9 +61,13 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
*/
def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = {
- val c: KTable[Windowed[K], java.lang.Long] =
+ val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
- c.mapValues[Long](Long2long _)
+ val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArraySessionStore, java.lang.Long]]
+ javaCountTable.mapValues[Long](
+ ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+ Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
+ )
}
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
new file mode 100644
index 0000000..2fdc09d
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
+import org.apache.kafka.streams.kstream.Suppressed.{
+ EagerBufferConfig,
+ StrictBufferConfig,
+ BufferConfig => BufferConfigJ
+}
+import org.apache.kafka.streams.kstream.internals.suppress.{
+ EagerBufferConfigImpl,
+ FinalResultsSuppressionBuilder,
+ StrictBufferConfigImpl,
+ SuppressedInternal
+}
+
+/**
+ * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
+ *
+ * This is required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use
+ * of static methods inside Java interfaces.
+ *
+ * TODO: Deprecate this class if support for Scala 2.11 + Java 1.8 is dropped.
+ */
+object Suppressed {
+
+ /**
+ * Configure the suppression to emit only the "final results" from the window.
+ *
+ * By default all Streams operators emit results whenever new results are available.
+ * This includes windowed operations.
+ *
+ * This configuration will instead emit just one result per key for each window, guaranteeing
+ * to deliver only the final result. This option is suitable for use cases in which the business logic
+ * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
+ *
+ * To accomplish this, the operator will buffer events from the window until the window close (that is,
+ * until the end-time passes, and additionally until the grace period expires). Since windowed operators
+ * are required to reject late events for a window whose grace period is expired, there is an additional
+ * guarantee that the final results emitted from this suppression will match any queriable state upstream.
+ *
+ * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
+ * This is required to be a "strict" config, since it would violate the "final results"
+ * property to emit early and then issue an update later.
+ * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
+ * @return a "final results" mode suppression configuration
+ * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
+ */
+ def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
+ new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
+
+ /**
+ * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record
+ * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
+ * the first record in the buffer but does <em>not</em> re-start the timer.
+ *
+ * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
+ * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
+ * @tparam K The key type for the KTable to apply this suppression to.
+ * @return a suppression configuration
+ * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
+ */
+ def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] =
+ new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
+
+ /**
+ * Duplicates the static factory methods inside the Java interface
+ * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
+ */
+ object BufferConfig {
+
+ /**
+ * Create a size-constrained buffer in terms of the maximum number of keys it will store.
+ *
+ * @param recordLimit maximum number of keys to buffer.
+ * @return size-constrained buffer in terms of the maximum number of keys it will store.
+ * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
+ */
+ def maxRecords(recordLimit: Long): EagerBufferConfig =
+ new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
+
+ /**
+ * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
+ *
+ * @param byteLimit maximum number of bytes to buffer.
+ * @return size-constrained buffer in terms of the maximum number of bytes it will use.
+ * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
+ */
+ def maxBytes(byteLimit: Long): EagerBufferConfig =
+ new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
+
+ /**
+ * Create a buffer unconstrained by size (either keys or bytes).
+ *
+ * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
+ *
+ * If there isn't enough heap available to meet the demand, the application will encounter an
+ * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that
+ * JVM processes under extreme memory pressure may exhibit poor GC behavior.
+ *
+ * This is a convenient option if you doubt that your buffer will be that large, but also don't
+ * wish to pick particular constraints, such as in testing.
+ *
+ * This buffer is "strict" in the sense that it will enforce the time bound or crash.
+ * It will never emit early.
+ *
+ * @return a buffer unconstrained by size (either keys or bytes).
+ * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
+ */
+ def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
+ }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index d84416e..c160600 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -20,7 +20,8 @@
package org.apache.kafka.streams.scala
package kstream
-import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, TimeWindowedKStream => TimeWindowedKStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
@@ -59,9 +60,13 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
*/
def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
- val c: KTable[Windowed[K], java.lang.Long] =
+ val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
- c.mapValues[Long](Long2long _)
+ val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArrayWindowStore, java.lang.Long]]
+ javaCountTable.mapValues[Long](
+ ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+ Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
+ )
}
/**
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index dc080f1..f2de3de 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -18,8 +18,12 @@
*/
package org.apache.kafka.streams.scala.kstream
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.{SessionWindows, TimeWindows, Windowed}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
import org.apache.kafka.streams.scala.utils.TestDriver
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import org.junit.runner.RunWith
@@ -139,4 +143,224 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
+
+ "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+ val window = TimeWindows.of(Duration.ofSeconds(1L))
+ val suppression = Suppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), BufferConfig.unbounded())
+
+ val table: KTable[Windowed[String], Long] = builder
+ .stream[String, String](sourceTopic)
+ .groupByKey
+ .windowedBy(window)
+ .count
+ .suppress(suppression)
+
+ table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ // publish key=1 @ time 0 => count==1
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish key=1 @ time 1 => count==2
+ testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time past the first window, but before the suppression window
+ testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time riiiight before suppression window ends
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish a late event before suppression window terminates => count==3
+ testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time right past the suppression window of the first window.
+ testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "0:1000:1"
+ record.value shouldBe 3L
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
+ "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+ val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
+ val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
+
+ val table: KTable[Windowed[String], Long] = builder
+ .stream[String, String](sourceTopic)
+ .groupByKey
+ .windowedBy(window)
+ .count
+ .suppress(suppression)
+
+ table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ // publish key=1 @ time 0 => count==1
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish key=1 @ time 1 => count==2
+ testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time past the window, but before the grace period
+ testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time riiiight before grace period ends
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish a late event before grace period terminates => count==3
+ testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time right past the grace period of the first window.
+ testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "0:1000:1"
+ record.value shouldBe 3L
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
+ "session windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+ // Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
+ val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
+ val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
+
+ val table: KTable[Windowed[String], Long] = builder
+ .stream[String, String](sourceTopic)
+ .groupByKey
+ .windowedBy(window)
+ .count
+ .suppress(suppression)
+
+ table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ // first window
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 0L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // first window
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 1L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // new window, but grace period hasn't ended for first window
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 8L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // late event for first window, included since grade period hasn't passed
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // push stream time forward to flush other events through
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L)
+ // too-late event should get dropped from the stream
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L)
+ // should now have to results
+ val r1 = testDriver.readRecord[String, Long](sinkTopic)
+ r1.key shouldBe "0:2:k1"
+ r1.value shouldBe 3L
+ val r2 = testDriver.readRecord[String, Long](sinkTopic)
+ r2.key shouldBe "8:8:k1"
+ r2.value shouldBe 1
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
+ "non-windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+ val suppression = Suppressed.untilTimeLimit[String](Duration.ofSeconds(2L), BufferConfig.unbounded())
+
+ val table: KTable[String, Long] = builder
+ .stream[String, String](sourceTopic)
+ .groupByKey
+ .count
+ .suppress(suppression)
+
+ table.toStream.to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ // publish key=1 @ time 0 => count==1
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish key=1 @ time 1 => count==2
+ testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time past the window, but before the grace period
+ testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time riiiight before grace period ends
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // publish a late event before grace period terminates => count==3
+ testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // move event time right past the grace period of the first window.
+ testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "1"
+ record.value shouldBe 3L
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
new file mode 100644
index 0000000..5df8347
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.internals.suppress.{
+ BufferFullStrategy,
+ EagerBufferConfigImpl,
+ FinalResultsSuppressionBuilder,
+ StrictBufferConfigImpl,
+ SuppressedInternal
+}
+import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class SuppressedTest extends FlatSpec with Matchers {
+
+ "Suppressed.untilWindowCloses" should "produce the correct suppression" in {
+ val bufferConfig = BufferConfig.unbounded()
+ val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
+ suppression shouldEqual new FinalResultsSuppressionBuilder(null, bufferConfig)
+ suppression.withName("soup") shouldEqual new FinalResultsSuppressionBuilder("soup", bufferConfig)
+ }
+
+ "Suppressed.untilTimeLimit" should "produce the correct suppression" in {
+ val bufferConfig = BufferConfig.unbounded()
+ val duration = Duration.ofMillis(1)
+ Suppressed.untilTimeLimit[String](duration, bufferConfig) shouldEqual
+ new SuppressedInternal[String](null, duration, bufferConfig, null, false)
+ }
+
+ "BufferConfig.maxRecords" should "produce the correct buffer config" in {
+ BufferConfig.maxRecords(4) shouldEqual new EagerBufferConfigImpl(4, Long.MaxValue)
+ BufferConfig.maxRecords(4).withMaxBytes(5) shouldEqual new EagerBufferConfigImpl(4, 5)
+ }
+
+ "BufferConfig.maxBytes" should "produce the correct buffer config" in {
+ BufferConfig.maxBytes(4) shouldEqual new EagerBufferConfigImpl(Long.MaxValue, 4)
+ BufferConfig.maxBytes(4).withMaxRecords(5) shouldEqual new EagerBufferConfigImpl(5, 4)
+ }
+
+ "BufferConfig.unbounded" should "produce the correct buffer config" in {
+ BufferConfig.unbounded() shouldEqual
+ new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN)
+ }
+
+ "BufferConfig" should "support very long chains of factory methods" in {
+ val bc1 = BufferConfig
+ .unbounded()
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L)
+ bc1 shouldEqual new EagerBufferConfigImpl(5L, 6L)
+ bc1.shutDownWhenFull() shouldEqual new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN)
+
+ val bc2 = BufferConfig
+ .maxBytes(4)
+ .withMaxRecords(5)
+ .withMaxBytes(6)
+ .withNoBound()
+ .withMaxBytes(7)
+ .withMaxRecords(8)
+
+ bc2 shouldEqual new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN)
+ bc2.withNoBound() shouldEqual BufferConfig.unbounded()
+
+ val bc3 = BufferConfig
+ .maxRecords(5L)
+ .withMaxBytes(10L)
+ .emitEarlyWhenFull()
+ .withMaxRecords(11L)
+
+ bc3 shouldEqual new EagerBufferConfigImpl(11L, 10L)
+ }
+}