You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:06 UTC
[25/33] samza-hello-samza git commit: SAMZA-1441: Updated High Level
API examples in hello-samza to provide serdes in code.
SAMZA-1441: Updated High Level API examples in hello-samza to provide serdes in code.
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Author: Yan Fang <ya...@gmail.com>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Aleksandar Pejakovic <a....@levi9.com>
Author: Navina Ramesh <nr...@linkedin.com>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Steven Aerts <st...@gmail.com>
Author: Chris Riccomini <cr...@apache.org>
Author: Manikumar Reddy <ma...@gmail.com>
Author: Yi Pan <ni...@gmail.com>
Author: Yan Fang <ya...@apache.org>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: Stanislav Los <sl...@gmail.com>
Author: Ken Gidley <kg...@yahoo.com>
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Jagadish Venkatraman <ja...@apache.org>
Closes #24 from prateekm/serde-instance
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e5943a00
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e5943a00
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e5943a00
Branch: refs/heads/master
Commit: e5943a000eef87e077c422e09dc20f09d4e876ca
Parents: 901c3a3
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed Oct 4 16:08:49 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 16:08:49 2017 -0700
----------------------------------------------------------------------
.../config/pageview-adclick-joiner.properties | 18 +---
src/main/config/pageview-filter.properties | 18 +---
src/main/config/pageview-sessionizer.properties | 18 +---
.../config/tumbling-pageview-counter.properties | 20 +---
...ikipedia-application-local-runner.properties | 2 -
.../config/wikipedia-application.properties | 4 +-
.../java/samza/examples/cookbook/AdClick.java | 58 ----------
.../java/samza/examples/cookbook/PageView.java | 61 -----------
.../cookbook/PageViewAdClickJoiner.java | 108 +++++++++++--------
.../examples/cookbook/PageViewFilterApp.java | 47 ++++----
.../cookbook/PageViewSessionizerApp.java | 54 +++++-----
.../cookbook/TumblingPageViewCounterApp.java | 52 +++++----
.../samza/examples/cookbook/data/AdClick.java | 54 ++++++++++
.../samza/examples/cookbook/data/PageView.java | 46 ++++++++
.../examples/cookbook/data/UserPageViews.java | 51 +++++++++
.../application/WikipediaApplication.java | 98 ++++++++++-------
16 files changed, 371 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
index 81ec3f6..eba7b0b 100644
--- a/src/main/config/pageview-adclick-joiner.properties
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-adclick-joiner
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewAdClickJoiner
-task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
index b9e8d2a..331ee1a 100644
--- a/src/main/config/pageview-filter.properties
+++ b/src/main/config/pageview-filter.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-filter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewFilterApp
-task.inputs=kafka.pageview-filter-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
index 847aa87..420cdde 100644
--- a/src/main/config/pageview-sessionizer.properties
+++ b/src/main/config/pageview-sessionizer.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-sessionizer
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewSessionizerApp
-task.inputs=kafka.pageview-session-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
index 09fb131..b58dbe9 100644
--- a/src/main/config/tumbling-pageview-counter.properties
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=tumbling-pageview-counter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.TumblingPageViewCounterApp
-task.inputs=kafka.pageview-tumbling-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
index 1911e68..b770f13 100644
--- a/src/main/config/wikipedia-application-local-runner.properties
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -25,7 +25,6 @@ job.coordinator.zk.connect=localhost:2181
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
@@ -39,7 +38,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
# Streams
streams.en-wikipedia.samza.system=wikipedia
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
index aeb8069..841fcc5 100644
--- a/src/main/config/wikipedia-application.properties
+++ b/src/main/config/wikipedia-application.properties
@@ -33,13 +33,11 @@ systems.wikipedia.port=6667
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
deleted file mode 100644
index 2d15cec..0000000
--- a/src/main/java/samza/examples/cookbook/AdClick.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.cookbook;
-
-/**
- * Represents an ad click event.
- */
-public class AdClick {
- /*
- * An unique identifier for the ad
- */
- private final String adId;
- /**
- * The user that clicked the ad
- */
- private final String userId;
- /**
- * The id of the page that the ad was served from
- */
- private final String pageId;
-
- public AdClick(String message) {
- String[] adClickFields = message.split(",");
- this.adId = adClickFields[0];
- this.userId = adClickFields[1];
- this.pageId = adClickFields[2];
- }
-
- public String getAdId() {
- return adId;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public String getPageId() {
- return pageId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
deleted file mode 100644
index 7803db7..0000000
--- a/src/main/java/samza/examples/cookbook/PageView.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-/**
- * Represents a Page view event
- */
-class PageView {
- /**
- * The user that viewed the page
- */
- private final String userId;
- /**
- * The region that the page was viewed from
- */
- private final String country;
- /**
- * A trackingId for the page
- */
- private final String pageId;
-
- /**
- * Constructs a {@link PageView} from the provided string.
- *
- * @param message in the following CSV format - userId,country,url
- */
- PageView(String message) {
- String[] pageViewFields = message.split(",");
- userId = pageViewFields[0];
- country = pageViewFields[1];
- pageId = pageViewFields[2];
- }
-
- String getUserId() {
- return userId;
- }
-
- String getCountry() {
- return country;
- }
-
- String getPageId() {
- return pageId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 94c7bc3..4f491f7 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -20,15 +20,18 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.AdClick;
+import samza.examples.cookbook.data.PageView;
import java.time.Duration;
-import java.util.function.Function;
/**
* In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
@@ -41,75 +44,94 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties
* </li>
* <li>
* Produce some messages to the "pageview-join-input" topic <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
* </li>
* <li>
* Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
- * adClickId1,user1,google.com <br/>
- * adClickId2,user1,yahoo.com
+ * {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/>
+ * {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"}
* </li>
* <li>
- * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
- * --property print.key=true
+ * Consume messages from the "pageview-adclick-join-output" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true
* </li>
* </ol>
*
*/
public class PageViewAdClickJoiner implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
- private static final String INPUT_TOPIC1 = "pageview-join-input";
- private static final String INPUT_TOPIC2 = "adclick-join-input";
-
+ private static final String PAGEVIEW_TOPIC = "pageview-join-input";
+ private static final String AD_CLICK_TOPIC = "adclick-join-input";
private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
@Override
public void init(StreamGraph graph, Config config) {
+ StringSerde stringSerde = new StringSerde();
+ JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class);
+ JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class);
+ JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class);
+
+ MessageStream<PageView> pageViews = graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde);
+ MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICK_TOPIC, adClickSerde);
+ OutputStream<JoinResult> joinResults = graph.getOutputStream(OUTPUT_TOPIC, joinResultSerde);
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
- MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+ JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction =
+ new JoinFunction<String, PageView, AdClick, JoinResult>() {
+ @Override
+ public JoinResult apply(PageView pageView, AdClick adClick) {
+ return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId());
+ }
- OutputStream<String, String, String> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+ @Override
+ public String getFirstKey(PageView pageView) {
+ return pageView.pageId;
+ }
- Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
- Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+ @Override
+ public String getSecondKey(AdClick adClick) {
+ return adClick.getPageId();
+ }
+ };
- MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
- MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+ MessageStream<PageView> repartitionedPageViews =
+ pageViews
+ .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+ .map(KV::getValue);
- pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+ MessageStream<AdClick> repartitionedAdClicks =
+ adClicks
+ .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+ .map(KV::getValue);
- @Override
- public String apply(String pageViewMsg, String adClickMsg) {
- PageView pageView = new PageView(pageViewMsg);
- AdClick adClick = new AdClick(adClickMsg);
- String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
- return joinResult;
- }
+ repartitionedPageViews
+ .join(repartitionedAdClicks, pageViewAdClickJoinFunction,
+ stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+ .sendTo(joinResults);
+ }
- @Override
- public String getFirstKey(String msg) {
- return new PageView(msg).getPageId();
- }
+ static class JoinResult {
+ public String pageId;
+ public String userId;
+ public String country;
+ public String adId;
- @Override
- public String getSecondKey(String msg) {
- return new AdClick(msg).getPageId();
- }
- }, Duration.ofMinutes(3)).sendTo(outputStream);
+ public JoinResult(String pageId, String userId, String country, String adId) {
+ this.pageId = pageId;
+ this.userId = userId;
+ this.country = country;
+ this.adId = adId;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index cb39553..80ce2d1 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -20,14 +20,14 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Function;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
/**
* In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
@@ -39,48 +39,41 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-filter-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties
* </li>
* <li>
* Produce some messages to the "pageview-filter-input" topic <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
* </li>
* <li>
* Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
- * --property print.key=true </li>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true
+ * </li>
* </ol>
- *
*/
public class PageViewFilterApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
- private static final String FILTER_KEY = "badKey";
private static final String INPUT_TOPIC = "pageview-filter-input";
private static final String OUTPUT_TOPIC = "pageview-filter-output";
+ private static final String INVALID_USER_ID = "invalidUserId";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
-
- OutputStream<String, String, String> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
-
- FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
pageViews
- .partitionBy(keyFn)
- .filter(filterFn)
- .sendTo(outputStream);
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
+ .sendTo(filteredPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index 7ec4f9d..f1000ae 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -20,21 +20,22 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
/**
* In this example, we group page views by userId into sessions, and compute the number of page views for each user
- * session. A session is considered closed when there is no user activity for a 3 second duration.
+ * session. A session is considered closed when there is no user activity for a 10 second duration.
*
* <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
*
@@ -43,45 +44,50 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-session-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties
* </li>
* <li>
* Produce some messages to the "pageview-session-input" topic <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
* </li>
* <li>
* Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
- * --property print.key=true
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true
* </li>
* </ol>
*
*/
public class PageViewSessionizerApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
private static final String INPUT_TOPIC = "pageview-session-input";
private static final String OUTPUT_TOPIC = "pageview-session-output";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, UserPageViews>> userPageViews =
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(keyFn)
- .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
- .sendTo(outputStream);
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+ .map(windowPane -> {
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage().size();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
+ .sendTo(userPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 1bc6ff4..0809180 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -20,18 +20,18 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
/**
* In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
@@ -46,45 +46,51 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-tumbling-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties
* </li>
* <li>
- * Produce some messages to the "pageview-tumbling-input" topic <br/>
+ * Produce some messages to the "pageview-tumbling-input" topic, waiting for some time between messages <br/>
./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
- user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
* </li>
* <li>
* Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
* </li>
* </ol>
*
*/
public class TumblingPageViewCounterApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
private static final String INPUT_TOPIC = "pageview-tumbling-input";
private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, UserPageViews>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(keyFn)
- .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .window(Windows.keyedTumblingWindow(
+ kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+ .map(windowPane -> {
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/AdClick.java b/src/main/java/samza/examples/cookbook/data/AdClick.java
new file mode 100644
index 0000000..42d45dc
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/AdClick.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook.data;
+
+/**
+ * An ad click event.
+ */
+public class AdClick {
+
+ private String pageId; // the unique id of the page that the ad was clicked on
+ private String adId; // an unique id for the ad
+ private String userId; // the user that clicked the ad
+
+ public String getPageId() {
+ return pageId;
+ }
+
+ public void setPageId(String pageId) {
+ this.pageId = pageId;
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public void setAdId(String adId) {
+ this.adId = adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java
new file mode 100644
index 0000000..9640694
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/PageView.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A page view event
+ */
+public class PageView {
+ public final String userId;
+ public final String country;
+ public final String pageId;
+
+ /**
+ * Constructs a page view event.
+ *
+ * @param pageId the id for the page that was viewed
+ * @param userId the user that viewed the page
+ * @param country the country that the page was viewed from
+ */
+ public PageView(
+ @JsonProperty("pageId") String pageId,
+ @JsonProperty("userId") String userId,
+ @JsonProperty("countryId") String country) {
+ this.userId = userId;
+ this.country = country;
+ this.pageId = pageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/UserPageViews.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/UserPageViews.java b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
new file mode 100644
index 0000000..9e10a14
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * User page view count.
+ */
+public class UserPageViews {
+ private final String userId;
+ private final int count;
+
+ /**
+ * Constructs a user page view count.
+ *
+ * @param userId the id of the user viewing the pages
+ * @param count number of page views by the user
+ */
+ public UserPageViews(
+ @JsonProperty("userId") String userId,
+ @JsonProperty("count") int count) {
+ this.userId = userId;
+ this.count = count;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public int getCount() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index c320209..736d934 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -20,11 +20,6 @@
package samza.examples.wikipedia.application;
import com.google.common.collect.ImmutableList;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
@@ -34,6 +29,8 @@ import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
@@ -41,6 +38,12 @@ import org.slf4j.LoggerFactory;
import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
@@ -82,46 +85,32 @@ public class WikipediaApplication implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
+ // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
+ graph.setDefaultSerde(new NoOpSerde<>());
+
// Inputs
// Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
- // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
- MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
- MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
- MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID);
+ MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID);
+ MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID);
- // Output (also un-keyed, so no keyExtractor)
- OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+ // Output (also un-keyed)
+ OutputStream<WikipediaStatsOutput> wikipediaStats =
+ graph.getOutputStream(STATS_STREAM_ID, new JsonSerdeV2<>(WikipediaStatsOutput.class));
// Merge inputs
- MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents =
+ MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
// Parse, update stats, prepare output, and send
- allWikipediaEvents.map(WikipediaParser::parseEvent)
+ allWikipediaEvents
+ .map(WikipediaParser::parseEvent)
.window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
.map(this::formatOutput)
.sendTo(wikipediaStats);
}
/**
- * A few statistics about the incoming messages.
- */
- private static class WikipediaStats {
- // Windowed stats
- int edits = 0;
- int byteDiff = 0;
- Set<String> titles = new HashSet<String>();
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- // Total stats
- int totalEdits = 0;
-
- @Override
- public String toString() {
- return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
- }
- }
-
- /**
* Updates the windowed and total stats based on each "edit" event.
*
* Uses a KeyValueStore to persist a total edit count across restarts.
@@ -177,17 +166,46 @@ public class WikipediaApplication implements StreamApplication {
/**
* Format the stats for output to Kafka.
*/
- private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
-
+ private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
WikipediaStats stats = statsWindowPane.getMessage();
+ return new WikipediaStatsOutput(
+ stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
+ }
+
+ /**
+ * A few statistics about the incoming messages.
+ */
+ private static class WikipediaStats {
+ // Windowed stats
+ int edits = 0;
+ int byteDiff = 0;
+ Set<String> titles = new HashSet<>();
+ Map<String, Integer> counts = new HashMap<>();
+
+ // Total stats
+ int totalEdits = 0;
- Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
- counts.put("edits", stats.edits);
- counts.put("edits-all-time", stats.totalEdits);
- counts.put("bytes-added", stats.byteDiff);
- counts.put("unique-titles", stats.titles.size());
+ @Override
+ public String toString() {
+ return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+ }
+ }
- return counts;
+ static class WikipediaStatsOutput {
+ public int edits;
+ public int editsAllTime;
+ public int bytesAdded;
+ public int uniqueTitles;
+ public Map<String, Integer> counts;
+
+ public WikipediaStatsOutput(int edits, int editsAllTime, int bytesAdded, int uniqueTitles,
+ Map<String, Integer> counts) {
+ this.edits = edits;
+ this.editsAllTime = editsAllTime;
+ this.bytesAdded = bytesAdded;
+ this.uniqueTitles = uniqueTitles;
+ this.counts = counts;
+ }
}
}