You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/21 17:03:59 UTC
[kafka] branch 2.0 updated: MINOR: update web docs and examples of
Streams with Java8 syntax (#5249)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 82ba5c0 MINOR: update web docs and examples of Streams with Java8 syntax (#5249)
82ba5c0 is described below
commit 82ba5c0ad65f089f9633f2fee926e77d125c90eb
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 21 10:02:58 2018 -0700
MINOR: update web docs and examples of Streams with Java8 syntax (#5249)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@confluent.io>
---
docs/streams/developer-guide/dsl-api.html | 10 +--
docs/streams/developer-guide/testing.html | 25 ++-----
.../examples/pageview/PageViewTypedDemo.java | 84 +++++++++++-----------
.../examples/pageview/PageViewUntypedDemo.java | 54 +++++---------
.../kafka/streams/examples/pipe/PipeDemo.java | 2 +-
.../examples/temperature/TemperatureDemo.java | 32 +++------
.../streams/examples/wordcount/WordCountDemo.java | 18 +----
.../examples/wordcount/WordCountProcessorDemo.java | 22 +++---
.../src/main/java/LineSplit.java | 14 ----
.../src/main/java/WordCount.java | 22 ------
10 files changed, 88 insertions(+), 195 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index cd3a965..beb83a3 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -898,9 +898,9 @@
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o"><</span><span class="n">Windowed</span><span class="o"><</span><span class="n">String</span><span class="o">>,</span> <span class="n">Long</span><span class="o">></span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na [...]
<span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
- <span class="o">()</span> <span class="o">-></span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
- <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
- <span class="n">Materialized</span><span class="o">.<</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o"><</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span><span class="n">as</span><span class="o">(</span><span class="s">"time-windowed-aggregated-stream-store"</span><span class="o">)</sp [...]
+ <span class="o">()</span> <span class="o">-></span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
+ <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
+ <span class="n">Materialized</span><span class="o">.<</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o"><</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span><span class="n">as</span><span class="o">(</span><span class="s">"time-windowed-aggregated-stream-store"</span><span class="o">)</ [...]
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
@@ -908,8 +908,8 @@
<span class="n">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-></span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
- <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span> <span class="n">rightAggValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">leftAggValue</span> <span class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span> <span class="cm">/* session merger */</span>
- <span class="n">Materialized</span><span class="o">.<</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">SessionStore</span><span class="o"><</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span><span class="n">as</span><span class="o">(</span><span class="s">"sessionized-aggregated-stream-store"</span><span class="o">)</span [...]
+ <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span> <span class="n">rightAggValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">leftAggValue</span> <span class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span> <span class="cm">/* session merger */</span>
+ <span class="n">Materialized</span><span class="o">.<</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">SessionStore</span><span class="o"><</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span><span class="n">as</span><span class="o">(</span><span class="s">"sessionized-aggregated-stream-store"</span><span class="o">)</s [...]
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Java 7 examples</span>
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index 92d8fce..bdecc43 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -255,18 +255,8 @@ public class CustomMaxAggregator implements Processor<String, Long> {
@Override
public void init(ProcessorContext context) {
this.context = context;
- context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- flushStore();
- }
- });
- context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- flushStore();
- }
- });
+ context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
+ context.schedule(10000, PunctuationType.STREAM_TIME, time -> flushStore());
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
}
@@ -287,9 +277,6 @@ public class CustomMaxAggregator implements Processor<String, Long> {
}
@Override
- public void punctuate(long timestamp) {} // deprecated; not used
-
- @Override
public void close() {}
}
</pre>
@@ -407,12 +394,8 @@ punctuator.punctuate(/*timestamp*/ 0L);
</div>
</div>
<div class="pagination">
- <div class="pagination">
- <a href="/{{version}}/documentation/streams/developer-guide/datatypes"
- class="pagination__btn pagination__btn__prev">Previous</a>
- <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
- class="pagination__btn pagination__btn__next">Next</a>
- </div>
+ <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
+ <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index bd24e84..234d3fc 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -28,16 +28,14 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -83,7 +81,7 @@ public class PageViewTypedDemo {
public String region;
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -151,56 +149,56 @@ public class PageViewTypedDemo {
Consumed.with(Serdes.String(), userProfileSerde));
KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
- .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
- @Override
- public PageViewByRegion apply(PageView view, UserProfile profile) {
- PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
-
- if (profile != null) {
- viewByRegion.region = profile.region;
- } else {
- viewByRegion.region = "UNKNOWN";
- }
- return viewByRegion;
- }
- })
- .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
- @Override
- public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
- return new KeyValue<>(viewRegion.region, viewRegion);
+ .leftJoin(users, (view, profile) -> {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+
+ if (profile != null) {
+ viewByRegion.region = profile.region;
+ } else {
+ viewByRegion.region = "UNKNOWN";
}
+ return viewByRegion;
})
+ .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.count()
.toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
- @Override
- public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
- WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
- wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.key();
-
- RegionCount rCount = new RegionCount();
- rCount.region = key.key();
- rCount.count = value;
-
- return new KeyValue<>(wViewByRegion, rCount);
- }
+ .map((key, value) -> {
+ WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+ wViewByRegion.windowStart = key.window().start();
+ wViewByRegion.region = key.key();
+
+ RegionCount rCount = new RegionCount();
+ rCount.region = key.key();
+ rCount.count = value;
+
+ return new KeyValue<>(wViewByRegion, rCount);
});
// write to the result topic
regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
- streams.start();
-
- // usually the stream application would be running forever,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
+
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ System.exit(1);
+ }
+ System.exit(0);
}
}
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c38d685..dddb542 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -33,13 +33,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
@@ -79,46 +75,30 @@ public class PageViewUntypedDemo {
KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed);
- KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
- @Override
- public String apply(JsonNode record) {
- return record.get("region").textValue();
- }
- });
+ KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
KStream<JsonNode, JsonNode> regionCount = views
- .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
- @Override
- public JsonNode apply(JsonNode view, String region) {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
- return jNode.put("user", view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region == null ? "UNKNOWN" : region);
- }
- })
- .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
- @Override
- public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
- return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
- }
+ .leftJoin(userRegions, (view, region) -> {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+ return (JsonNode) jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region == null ? "UNKNOWN" : region);
+
})
+ .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
.count()
.toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
- @Override
- public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
- ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
- keyNode.put("window-start", key.window().start())
- .put("region", key.key());
-
- ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
- valueNode.put("count", value);
-
- return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
- }
+ .map((key, value) -> {
+ ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("window-start", key.window().start())
+ .put("region", key.key());
+
+ ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+ valueNode.put("count", value);
+
+ return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
});
// write to the result topic
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 5389877..d61e174 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -38,7 +38,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class PipeDemo {
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index c5eb5f9..4607d75 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -23,10 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
@@ -71,7 +68,7 @@ public class TemperatureDemo {
// window size within which the filtering is applied
private static final int TEMPERATURE_WINDOW_SIZE = 5;
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
@@ -89,30 +86,17 @@ public class TemperatureDemo {
KStream<Windowed<String>, String> max = source
// temperature values are sent without a key (null), so in order
// to group and reduce them, a key is needed ("temp" has been chosen)
- .selectKey(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return "temp";
- }
- })
+ .selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
- .reduce(new Reducer<String>() {
- @Override
- public String apply(String value1, String value2) {
- if (Integer.parseInt(value1) > Integer.parseInt(value2))
- return value1;
- else
- return value2;
- }
+ .reduce((value1, value2) -> {
+ if (Integer.parseInt(value1) > Integer.parseInt(value2))
+ return value1;
+ else
+ return value2;
})
.toStream()
- .filter(new Predicate<Windowed<String>, String>() {
- @Override
- public boolean test(Windowed<String> key, String value) {
- return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
- }
- });
+ .filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 7535315..4f0150e 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -23,9 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Locale;
@@ -46,7 +44,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class WordCountDemo {
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -64,18 +62,8 @@ public class WordCountDemo {
KStream<String, String> source = builder.stream("streams-plaintext-input");
KTable<String, Long> counts = source
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- })
+ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
+ .groupBy((key, value) -> value)
.count();
// need to override value serde to Long type
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 523bb46..86feaeb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -61,19 +60,16 @@ public class WordCountProcessorDemo {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
- this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- try (KeyValueIterator<String, Integer> iter = kvStore.all()) {
- System.out.println("----------- " + timestamp + " ----------- ");
+ this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {
+ try (KeyValueIterator<String, Integer> iter = kvStore.all()) {
+ System.out.println("----------- " + timestamp + " ----------- ");
- while (iter.hasNext()) {
- KeyValue<String, Integer> entry = iter.next();
+ while (iter.hasNext()) {
+ KeyValue<String, Integer> entry = iter.next();
- System.out.println("[" + entry.key + ", " + entry.value + "]");
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
- context.forward(entry.key, entry.value.toString());
- }
+ context.forward(entry.key, entry.value.toString());
}
}
});
@@ -103,7 +99,7 @@ public class WordCountProcessorDemo {
}
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -123,7 +119,7 @@ public class WordCountProcessorDemo {
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.Integer()),
- "Process");
+ "Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
index ec40d2a..bbf54e6 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
@@ -44,24 +44,10 @@ public class LineSplit {
final StreamsBuilder builder = new StreamsBuilder();
- builder.<String, String>stream("streams-plaintext-input")
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.split("\\W+"));
- }
- })
- .to("streams-linesplit-output");
-
- /* ------- use the code below for Java 8 and uncomment the above ----
-
builder.stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
- ----------------------------------------------------------------- */
-
-
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 020eb03..bdbefed 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -51,34 +51,12 @@ public class WordCount {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- })
- .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
- .toStream()
- .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
-
-
- /* ------- use the code below for Java 8 and comment the above ----
-
- builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
- ----------------------------------------------------------------- */
-
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);