You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 01:29:30 UTC
[kafka] branch trunk updated: MINOR: Remove deprecated callers
(#5911)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14d3ead MINOR: Remove deprecated callers (#5911)
14d3ead is described below
commit 14d3ead19d250f2f3117af473ff6244c663ef8ca
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Nov 14 17:29:19 2018 -0800
MINOR: Remove deprecated callers (#5911)
Callers of 1) Windows#until, 2) Windows#of, 3) Serialized are replaced when possible with the new APIs.
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
.../examples/pageview/PageViewTypedDemo.java | 4 +--
.../examples/pageview/PageViewUntypedDemo.java | 4 +--
.../apache/kafka/streams/kstream/JoinWindows.java | 4 +--
.../org/apache/kafka/streams/kstream/KStream.java | 2 +-
.../org/apache/kafka/streams/kstream/KTable.java | 2 +-
.../kafka/streams/kstream/KeyValueMapper.java | 4 +--
.../kafka/streams/kstream/SessionWindows.java | 2 +-
.../KStreamAggregationDedupIntegrationTest.java | 10 ++----
.../KStreamAggregationIntegrationTest.java | 16 ++++------
.../kafka/streams/kstream/JoinWindowsTest.java | 34 ++++++++------------
.../kstream/RepartitionTopicNamingTest.java | 29 +++++++++--------
.../kafka/streams/kstream/SessionWindowsTest.java | 17 ++++------
.../kafka/streams/kstream/TimeWindowsTest.java | 25 ++++++---------
.../kstream/internals/KGroupedStreamImplTest.java | 4 +--
.../kstream/internals/KGroupedTableImplTest.java | 13 +++-----
.../streams/kstream/internals/KStreamImplTest.java | 37 +++++++++++-----------
.../internals/KStreamWindowAggregateTest.java | 2 +-
.../kstream/internals/KStreamWindowReduceTest.java | 6 ++--
.../kstream/internals/KTableAggregateTest.java | 6 ++--
.../streams/kstream/internals/KTableImplTest.java | 6 ++--
.../internals/KTableKTableLeftJoinTest.java | 25 +++------------
.../internals/KTableTransformValuesTest.java | 7 ++--
.../internals/SessionWindowedKStreamImplTest.java | 4 +--
.../kstream/internals/SuppressScenarioTest.java | 7 ++--
.../internals/TimeWindowedKStreamImplTest.java | 4 +--
.../apache/kafka/streams/perf/YahooBenchmark.java | 4 +--
.../streams/tests/BrokerCompatibilityTest.java | 4 +--
.../kafka/streams/tests/SmokeTestClient.java | 7 ++--
28 files changed, 125 insertions(+), 164 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 18b4912..f09ac80 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
@@ -170,7 +171,6 @@ public class PageViewTypedDemo {
public String region;
}
- @SuppressWarnings("deprecation")
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
@@ -206,7 +206,7 @@ public class PageViewTypedDemo {
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), new JSONSerde<>()))
+ .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index d492238..2a9972b 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
@@ -54,7 +55,6 @@ import java.util.Properties;
*/
public class PageViewUntypedDemo {
- @SuppressWarnings("deprecation")
public static void main(final String[] args) throws Exception {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
@@ -87,7 +87,7 @@ public class PageViewUntypedDemo {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), jsonSerde))
+ .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 62eade4..2087009 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -246,7 +246,7 @@ public final class JoinWindows extends Windows<Window> {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
- * @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
+ * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead.
*/
@SuppressWarnings("deprecation")
@Override
@@ -264,7 +264,7 @@ public final class JoinWindows extends Windows<Window> {
* For {@link TimeWindows} the maintain duration is at least as small as the window size.
*
* @return the window maintain duration
- * @deprecated since 2.1. Use {@link JoinWindows#gracePeriodMs()} instead.
+ * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
*/
@SuppressWarnings("deprecation")
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 77987a9..6055199 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -874,7 +874,7 @@ public interface KStream<K, V> {
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
- * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} instead.
+ * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead.
*
* @param selector a {@link KeyValueMapper} that computes a new key for grouping
* @param <KR> the key type of the result {@link KGroupedStream}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b49f3e4..5ed0270 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -573,7 +573,7 @@ public interface KTable<K, V> {
* records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
* on the new key.
* <p>
- * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)}
+ * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)}
* instead.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 2a56a05..1112fbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue;
* @see KStream#flatMap(KeyValueMapper)
* @see KStream#selectKey(KeyValueMapper)
* @see KStream#groupBy(KeyValueMapper)
- * @see KStream#groupBy(KeyValueMapper, Serialized)
+ * @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper)
- * @see KTable#groupBy(KeyValueMapper, Serialized)
+ * @see KTable#groupBy(KeyValueMapper, Grouped)
* @see KTable#toStream(KeyValueMapper)
*/
public interface KeyValueMapper<K, V, VR> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 02c7cbf..526c9d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -89,7 +89,7 @@ public final class SessionWindows {
* @return a new window specification with default maintain duration of 1 day
*
* @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
- * @deprecated User {@link #with(Duration)} instead.
+ * @deprecated Use {@link #with(Duration)} instead.
*/
@Deprecated
public static SessionWindows with(final long inactivityGapMs) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 1db23a5..7493b06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -78,7 +79,6 @@ public class KStreamAggregationDedupIntegrationTest {
private Reducer<String> reducer;
private KStream<Integer, String> stream;
- @SuppressWarnings("deprecation")
@Before
public void before() throws InterruptedException {
testNo++;
@@ -96,10 +96,7 @@ public class KStreamAggregationDedupIntegrationTest {
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
- groupedStream = stream
- .groupBy(
- mapper,
- org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+ groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String()));
reducer = (value1, value2) -> value1 + ":" + value2;
}
@@ -173,14 +170,13 @@ public class KStreamAggregationDedupIntegrationTest {
);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldGroupByKey() throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
- stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String()))
+ stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count(Materialized.as("count-windows"))
.toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start())
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 718483b..04cc0e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
@@ -92,7 +93,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings({"unchecked", "deprecation"})
+@SuppressWarnings("unchecked")
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@@ -135,10 +136,7 @@ public class KStreamAggregationIntegrationTest {
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
- groupedStream = stream
- .groupBy(
- mapper,
- org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+ groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String()));
reducer = (value1, value2) -> value1 + ":" + value2;
initializer = () -> 0;
@@ -428,7 +426,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(timestamp);
produceMessages(timestamp);
- stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String()))
+ stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
.toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
@@ -521,7 +519,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
.count()
.toStream()
@@ -619,7 +617,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(11);
final String userSessionsStore = "UserSessionsStore";
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
.toStream()
@@ -706,7 +704,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(5);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
.count()
.toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 7127c9f..dda2da4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -72,12 +72,11 @@ public class JoinWindowsTest {
}
}
- @Deprecated
@Test
- public void untilShouldSetMaintainDuration() {
+ public void untilShouldSetGraceDuration() {
final JoinWindows windowSpec = JoinWindows.of(ofMillis(ANY_SIZE));
final long windowSize = windowSpec.size();
- assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
+ assertEquals(windowSize, windowSpec.grace(ofMillis(windowSize)).gracePeriodMs());
}
@Deprecated
@@ -115,16 +114,16 @@ public class JoinWindowsTest {
verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(2)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2)));
- verifyEquality(JoinWindows.of(ofMillis(3)).until(60), JoinWindows.of(ofMillis(3)).until(60));
+ verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(60)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60)));
verifyEquality(
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
);
// JoinWindows is a little weird in that before and after set the same fields as of.
verifyEquality(
- JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+ JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
);
}
@@ -138,27 +137,22 @@ public class JoinWindowsTest {
verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(9)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2)));
- verifyInEquality(JoinWindows.of(ofMillis(3)).until(90), JoinWindows.of(ofMillis(3)).until(60));
-
+ verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(90)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60)));
- verifyInEquality(
- JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
- );
verifyInEquality(
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)).until(60),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+ JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)),
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
verifyInEquality(
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)).until(60),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)),
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
verifyInEquality(
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(90),
- JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
+ JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 3f43691..929bc3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.junit.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -102,8 +103,8 @@ public class RepartitionTopicNamingTest {
try {
final StreamsBuilder builder = new StreamsBuilder();
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
- kGroupedStream.windowedBy(TimeWindows.of(10)).count();
- kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+ kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+ kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
builder.build();
fail("Should not build re-using repartition topic name");
} catch (final TopologyException te) {
@@ -121,8 +122,8 @@ public class RepartitionTopicNamingTest {
final KStream<String, String> stream2 = builder.<String, String>stream("topic2").selectKey((k, v) -> k);
final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
- final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
- joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+ final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition"));
+ joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition"));
builder.build();
fail("Should not build re-using repartition topic name");
} catch (final TopologyException te) {
@@ -137,8 +138,8 @@ public class RepartitionTopicNamingTest {
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
- kGroupedStream.windowedBy(TimeWindows.of(10)).count();
- kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+ kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+ kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
builder.build(properties);
}
@@ -272,9 +273,9 @@ public class RepartitionTopicNamingTest {
if (isGroupByKey) {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+ selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
} else {
- selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+ selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
}
} else {
if (otherOperations) {
@@ -297,15 +298,15 @@ public class RepartitionTopicNamingTest {
final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
if (isGroupByKey) {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+ selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
} else {
- selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+ selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
}
} else {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+ selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
} else {
- selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+ selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
}
}
@@ -356,7 +357,7 @@ public class RepartitionTopicNamingTest {
final String joinRepartitionTopicName = "my-join";
updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2,
- JoinWindows.of(1000), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
+ JoinWindows.of(Duration.ofMillis(1000L)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
return builder.build().describe().toString();
}
@@ -403,7 +404,7 @@ public class RepartitionTopicNamingTest {
mappedStream.filter((k, v) -> k.equals("A"))
.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
- JoinWindows.of(5000),
+ JoinWindows.of(Duration.ofMillis(5000L)),
Joined.with(Serdes.String(), Serdes.String(), Serdes.Long(), fourthRepartitionTopicName))
.to(JOINED_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 369bbad..eac978c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -33,11 +33,10 @@ public class SessionWindowsTest {
assertEquals(anyGap, SessionWindows.with(ofMillis(anyGap)).inactivityGap());
}
- @Deprecated
@Test
- public void shouldSetWindowRetentionTime() {
+ public void shouldSetWindowGraceTime() {
final long anyRetentionTime = 42L;
- assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).until(anyRetentionTime).maintainMs());
+ assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).grace(ofMillis(anyRetentionTime)).gracePeriodMs());
}
@@ -88,9 +87,9 @@ public class SessionWindowsTest {
verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
- verifyEquality(SessionWindows.with(ofMillis(1)).until(7), SessionWindows.with(ofMillis(1)).until(7));
+ verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
- verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+ verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7)));
}
@Test
@@ -99,12 +98,10 @@ public class SessionWindowsTest {
verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
- verifyInEquality(SessionWindows.with(ofMillis(1)).until(9), SessionWindows.with(ofMillis(1)).until(7));
-
- verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+ verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
- verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+ verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
- verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(70), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+ verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 2bdb3a0..dcb691b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -160,11 +160,11 @@ public class TimeWindowsTest {
verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(1)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1)));
- verifyEquality(TimeWindows.of(ofMillis(3)).until(4), TimeWindows.of(ofMillis(3)).until(4));
+ verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(4)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4)));
verifyEquality(
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4),
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4)
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)),
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4))
);
}
@@ -176,27 +176,22 @@ public class TimeWindowsTest {
verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(2)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1)));
- verifyInEquality(TimeWindows.of(ofMillis(3)).until(9), TimeWindows.of(ofMillis(3)).until(4));
+ verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(9)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4)));
verifyInEquality(
- TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4),
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+ TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)),
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
);
verifyInEquality(
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)).until(4),
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)),
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
);
assertNotEquals(
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)).until(4),
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
- );
-
- assertNotEquals(
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(9),
- TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)),
+ TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 26b7e24..f2fc4f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
@@ -78,11 +79,10 @@ public class KGroupedStreamImplTest {
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @SuppressWarnings("deprecation")
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
- groupedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+ groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index c06cce4..99f1b81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -204,13 +205,11 @@ public class KGroupedTableImplTest {
}
}
- @SuppressWarnings({"unchecked", "deprecation"})
+ @SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
- table.groupBy(MockMapper.selectValueKeyValueMapper(),
- org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(),
- Serdes.String()))
+ table.groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
@@ -223,13 +222,11 @@ public class KGroupedTableImplTest {
}
}
- @SuppressWarnings({"unchecked", "deprecation"})
+ @SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
- table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
- org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(),
- Serdes.String()))
+ table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1f3492c..772836f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
@@ -54,6 +55,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -188,7 +190,6 @@ public class KStreamImplTest {
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
}
- @SuppressWarnings("deprecation")
@Test
public void shouldPreserveSerdesForOperators() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -264,28 +265,28 @@ public class KStreamImplTest {
assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde());
- assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null);
assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde());
- assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
- assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
- assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
- assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
- assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
- assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
- assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
- assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
- assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
- assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
- assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
- assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
- assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null);
@@ -384,7 +385,7 @@ public class KStreamImplTest {
});
stream.join(kStream,
valueJoiner,
- JoinWindows.of(ofMillis(windowSize)).until(3 * windowSize),
+ JoinWindows.of(ofMillis(windowSize)).grace(ofMillis(3 * windowSize)),
Joined.with(Serdes.String(),
Serdes.String(),
Serdes.String()))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index b82c8fe..068062b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -260,7 +260,7 @@ public class KStreamWindowAggregateTest {
final String topic = "topic";
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
- stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
.aggregate(
() -> "",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 790c563..634cb2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -51,14 +52,13 @@ public class KStreamWindowReduceTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- @SuppressWarnings("deprecation")
@Test
public void shouldLogAndMeterOnNullKey() {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.reduce((value1, value2) -> value1 + "+" + value2);
@@ -80,7 +80,7 @@ public class KStreamWindowReduceTest {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(5L)).until(100))
.reduce((value1, value2) -> value1 + "+" + value2)
.toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 01b2609..0452c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -55,8 +56,7 @@ public class KTableAggregateTest {
private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- private final org.apache.kafka.streams.kstream.Serialized<String, String> stringSerialzied =
- org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde);
+ private final Grouped<String, String> stringSerialzied = Grouped.with(stringSerde, stringSerde);
private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
private File stateDir = null;
@@ -362,7 +362,7 @@ public class KTableAggregateTest {
public KeyValue<String, Long> apply(final Long key, final String value) {
return new KeyValue<>(value, key);
}
- }, org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Long()))
+ }, Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(new Reducer<Long>() {
@Override
public Long apply(final Long value1, final Long value2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 5747fed..d0ed50b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
@@ -135,7 +136,6 @@ public class KTableImplTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldPreserveSerdesForOperators() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
@@ -184,8 +184,8 @@ public class KTableImplTest {
assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null);
assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null);
- assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 1cd360a..f5d74b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,19 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@@ -344,7 +342,6 @@ public class KTableKTableLeftJoinTest {
* It is based on a fairly complicated join used by the developer that reported the bug.
* Before the fix this would trigger an IllegalStateException.
*/
- @SuppressWarnings("deprecation")
@Test
public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
final String agg = "agg";
@@ -360,16 +357,8 @@ public class KTableKTableLeftJoinTest {
final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
final KTable<Long, String> aggTable = builder
.table(agg, consumed)
- .groupBy(
- new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
- @Override
- public KeyValue<Long, String> apply(final Long key, final String value) {
- return new KeyValue<>(key, value);
- }
- },
- org.apache.kafka.streams.kstream.Serialized.with(Serdes.Long(), Serdes.String())
- )
- .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("agg-store"));
+ .groupBy(KeyValue::new, Grouped.with(Serdes.Long(), Serdes.String()))
+ .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.as("agg-store"));
final KTable<Long, String> one = builder.table(tableOne, consumed);
final KTable<Long, String> two = builder.table(tableTwo, consumed);
@@ -378,12 +367,8 @@ public class KTableKTableLeftJoinTest {
final KTable<Long, String> five = builder.table(tableFive, consumed);
final KTable<Long, String> six = builder.table(tableSix, consumed);
- final ValueMapper<String, String> mapper = new ValueMapper<String, String>() {
- @Override
- public String apply(final String value) {
- return value.toUpperCase(Locale.ROOT);
- }
- };
+ final ValueMapper<String, String> mapper = value -> value.toUpperCase(Locale.ROOT);
+
final KTable<Long, String> seven = one.mapValues(mapper);
final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.TOSTRING_JOINER);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 82ada52..350b0d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -359,7 +360,6 @@ public class KTableTransformValuesTest {
assertThat(keyValueStore.get("C"), is("C->null!"));
}
- @SuppressWarnings("deprecation")
@Test
public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
builder
@@ -369,7 +369,7 @@ public class KTableTransformValuesTest {
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(QUERYABLE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()))
- .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer()))
+ .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer()))
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
.mapValues(mapBackToStrings())
.toStream()
@@ -387,13 +387,12 @@ public class KTableTransformValuesTest {
assertThat(keyValueStore.get("A"), is(3));
}
- @SuppressWarnings("deprecation")
@Test
public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
builder
.table(INPUT_TOPIC, CONSUMED)
.transformValues(new StatelessTransformerSupplier())
- .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer()))
+ .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer()))
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
.mapValues(mapBackToStrings())
.toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 749d5d6..57d7b14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
@@ -65,11 +66,10 @@ public class SessionWindowedKStreamImplTest {
};
private SessionWindowedKStream<String, String> stream;
- @SuppressWarnings("deprecation")
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
- this.stream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(500)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 57b860e..b0b87be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -359,7 +359,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
- .windowedBy(TimeWindows.of(2L).grace(ofMillis(1L)))
+ .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -410,7 +410,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
- .windowedBy(TimeWindows.of(2L).grace(ofMillis(2L)))
+ .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(2L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -459,14 +459,13 @@ public class SuppressScenarioTest {
}
}
- @SuppressWarnings("deprecation")
@Test
public void shouldSupportFinalResultsForSessionWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
- .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L)))
+ .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index c67b9e6..ce0f25e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
@@ -58,11 +59,10 @@ public class TimeWindowedKStreamImplTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private TimeWindowedKStream<String, String> windowedStream;
- @SuppressWarnings("deprecation")
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
- windowedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ windowedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index de1a621..ec7f2fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -251,7 +252,6 @@ public class YahooBenchmark {
}
}
- @SuppressWarnings("deprecation")
private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
final CountDownLatch latch, final int numRecords) {
final Map<String, Object> serdeProps = new HashMap<>();
@@ -333,7 +333,7 @@ public class YahooBenchmark {
// calculate windowed counts
keyedByCampaign
- .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windows"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 623c3e3..47f74d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.io.IOException;
@@ -46,7 +47,6 @@ public class BrokerCompatibilityTest {
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
- @SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, eosEnabled; but only see " + args.length + " parameter");
@@ -83,7 +83,7 @@ public class BrokerCompatibilityTest {
final StreamsBuilder builder = new StreamsBuilder();
- builder.<String, String>stream(SOURCE_TOPIC).groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde))
+ builder.<String, String>stream(SOURCE_TOPIC).groupByKey(Grouped.with(stringSerde, stringSerde))
.count()
.toStream()
.mapValues(new ValueMapper<Long, String>() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index ef908f4..a396ad1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
@@ -110,7 +111,6 @@ public class SmokeTestClient extends SmokeTestUtil {
return fullProps;
}
- @SuppressWarnings("deprecation")
private static KafkaStreams createKafkaStreams(final Properties props) {
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -125,8 +125,7 @@ public class SmokeTestClient extends SmokeTestUtil {
data.process(SmokeTestUtil.printProcessorSupplier("data"));
// min
- final KGroupedStream<String, Integer> groupedData =
- data.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, intSerde));
+ final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
groupedData
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
@@ -239,7 +238,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// test repartition
final Agg agg = new Agg();
- cntTable.groupBy(agg.selector(), org.apache.kafka.streams.kstream.Serialized.with(stringSerde, longSerde))
+ cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
.aggregate(agg.init(), agg.adder(), agg.remover(),
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
.withKeySerde(Serdes.String())