You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:06 UTC

[25/33] samza-hello-samza git commit: SAMZA-1441: Updated High Level API examples in hello-samza to provide serdes in code.

SAMZA-1441: Updated High Level API examples in hello-samza to provide serdes in code.

Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Author: Yan Fang <ya...@gmail.com>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Aleksandar Pejakovic <a....@levi9.com>
Author: Navina Ramesh <nr...@linkedin.com>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Steven Aerts <st...@gmail.com>
Author: Chris Riccomini <cr...@apache.org>
Author: Manikumar Reddy <ma...@gmail.com>
Author: Yi Pan <ni...@gmail.com>
Author: Yan Fang <ya...@apache.org>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: Stanislav Los <sl...@gmail.com>
Author: Ken Gidley <kg...@yahoo.com>
Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Jagadish Venkatraman <ja...@apache.org>

Closes #24 from prateekm/serde-instance


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e5943a00
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e5943a00
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e5943a00

Branch: refs/heads/master
Commit: e5943a000eef87e077c422e09dc20f09d4e876ca
Parents: 901c3a3
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed Oct 4 16:08:49 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 16:08:49 2017 -0700

----------------------------------------------------------------------
 .../config/pageview-adclick-joiner.properties   |  18 +---
 src/main/config/pageview-filter.properties      |  18 +---
 src/main/config/pageview-sessionizer.properties |  18 +---
 .../config/tumbling-pageview-counter.properties |  20 +---
 ...ikipedia-application-local-runner.properties |   2 -
 .../config/wikipedia-application.properties     |   4 +-
 .../java/samza/examples/cookbook/AdClick.java   |  58 ----------
 .../java/samza/examples/cookbook/PageView.java  |  61 -----------
 .../cookbook/PageViewAdClickJoiner.java         | 108 +++++++++++--------
 .../examples/cookbook/PageViewFilterApp.java    |  47 ++++----
 .../cookbook/PageViewSessionizerApp.java        |  54 +++++-----
 .../cookbook/TumblingPageViewCounterApp.java    |  52 +++++----
 .../samza/examples/cookbook/data/AdClick.java   |  54 ++++++++++
 .../samza/examples/cookbook/data/PageView.java  |  46 ++++++++
 .../examples/cookbook/data/UserPageViews.java   |  51 +++++++++
 .../application/WikipediaApplication.java       |  98 ++++++++++-------
 16 files changed, 371 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
index 81ec3f6..eba7b0b 100644
--- a/src/main/config/pageview-adclick-joiner.properties
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -18,29 +18,19 @@
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=pageview-adclick-joiner
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
 
 # YARN
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
 # Task
 app.class=samza.examples.cookbook.PageViewAdClickJoiner
-task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
 task.window.ms=2000
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181
 systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
index b9e8d2a..331ee1a 100644
--- a/src/main/config/pageview-filter.properties
+++ b/src/main/config/pageview-filter.properties
@@ -18,29 +18,19 @@
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=pageview-filter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
 
 # YARN
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
 # Task
 app.class=samza.examples.cookbook.PageViewFilterApp
-task.inputs=kafka.pageview-filter-input
 task.window.ms=2000
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181
 systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
index 847aa87..420cdde 100644
--- a/src/main/config/pageview-sessionizer.properties
+++ b/src/main/config/pageview-sessionizer.properties
@@ -18,29 +18,19 @@
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=pageview-sessionizer
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
 
 # YARN
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
 # Task
 app.class=samza.examples.cookbook.PageViewSessionizerApp
-task.inputs=kafka.pageview-session-input
 task.window.ms=2000
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181
 systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
index 09fb131..b58dbe9 100644
--- a/src/main/config/tumbling-pageview-counter.properties
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -18,29 +18,19 @@
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=tumbling-pageview-counter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
 
 # YARN
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
 # Task
 app.class=samza.examples.cookbook.TumblingPageViewCounterApp
-task.inputs=kafka.pageview-tumbling-input
 task.window.ms=2000
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
 systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
index 1911e68..b770f13 100644
--- a/src/main/config/wikipedia-application-local-runner.properties
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -25,7 +25,6 @@ job.coordinator.zk.connect=localhost:2181
 task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
 
 # Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
 
@@ -39,7 +38,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092
 systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
 
 # Streams
 streams.en-wikipedia.samza.system=wikipedia

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
index aeb8069..841fcc5 100644
--- a/src/main/config/wikipedia-application.properties
+++ b/src/main/config/wikipedia-application.properties
@@ -33,13 +33,11 @@ systems.wikipedia.port=6667
 
 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
 systems.kafka.producer.bootstrap.servers=localhost:9092
 systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
 
 # Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
deleted file mode 100644
index 2d15cec..0000000
--- a/src/main/java/samza/examples/cookbook/AdClick.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.cookbook;
-
-/**
- * Represents an ad click event.
- */
-public class AdClick {
-  /*
-   * An unique identifier for the ad
-   */
-  private final String adId;
-  /**
-   * The user that clicked the ad
-   */
-  private final String userId;
-  /**
-   * The id of the page that the ad was served from
-   */
-  private final String pageId;
-
-  public AdClick(String message) {
-    String[] adClickFields = message.split(",");
-    this.adId = adClickFields[0];
-    this.userId = adClickFields[1];
-    this.pageId = adClickFields[2];
-  }
-
-  public String getAdId() {
-    return adId;
-  }
-
-  public String getUserId() {
-    return userId;
-  }
-
-  public String getPageId() {
-    return pageId;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
deleted file mode 100644
index 7803db7..0000000
--- a/src/main/java/samza/examples/cookbook/PageView.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-/**
- * Represents a Page view event
- */
-class PageView {
-  /**
-   * The user that viewed the page
-   */
-  private final String userId;
-  /**
-   * The region that the page was viewed from
-   */
-  private final String country;
-  /**
-   * A trackingId for the page
-   */
-  private final String pageId;
-
-  /**
-   * Constructs a {@link PageView} from the provided string.
-   *
-   * @param message in the following CSV format - userId,country,url
-   */
-  PageView(String message) {
-    String[] pageViewFields = message.split(",");
-    userId = pageViewFields[0];
-    country = pageViewFields[1];
-    pageId = pageViewFields[2];
-  }
-
-  String getUserId() {
-    return userId;
-  }
-
-  String getCountry() {
-    return country;
-  }
-
-  String getPageId() {
-    return pageId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 94c7bc3..4f491f7 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -20,15 +20,18 @@ package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.AdClick;
+import samza.examples.cookbook.data.PageView;
 
 import java.time.Duration;
-import java.util.function.Function;
 
 /**
  * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
@@ -41,75 +44,94 @@ import java.util.function.Function;
  * <ol>
  *   <li>
  *     Ensure that the topics "pageview-join-input", "adclick-join-input" are created  <br/>
- *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1
  *   </li>
  *   <li>
- *     Run the application using the ./bin/run-app.sh script <br/>
- *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- *     --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties
  *   </li>
  *   <li>
  *     Produce some messages to the "pageview-join-input" topic <br/>
  *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
- *     user1,india,google.com <br/>
- *     user2,china,yahoo.com
+ *     {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
  *   </li>
  *   <li>
  *     Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
  *     ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
- *     adClickId1,user1,google.com <br/>
- *     adClickId2,user1,yahoo.com
+ *     {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/>
+ *     {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"}
  *   </li>
  *   <li>
- *     Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
- *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
- *     --property print.key=true
+ *     Consume messages from the "pageview-adclick-join-output" topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true
  *   </li>
  * </ol>
  *
  */
 public class PageViewAdClickJoiner implements StreamApplication {
 
-  private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
-  private static final String INPUT_TOPIC1 = "pageview-join-input";
-  private static final String INPUT_TOPIC2 = "adclick-join-input";
-
+  private static final String PAGEVIEW_TOPIC = "pageview-join-input";
+  private static final String AD_CLICK_TOPIC = "adclick-join-input";
   private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    StringSerde stringSerde = new StringSerde();
+    JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class);
+    JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class);
+    JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class);
+
+    MessageStream<PageView> pageViews = graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde);
+    MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICK_TOPIC, adClickSerde);
+    OutputStream<JoinResult> joinResults = graph.getOutputStream(OUTPUT_TOPIC, joinResultSerde);
 
-    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
-    MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+    JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction =
+        new JoinFunction<String, PageView, AdClick, JoinResult>() {
+          @Override
+          public JoinResult apply(PageView pageView, AdClick adClick) {
+            return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId());
+          }
 
-    OutputStream<String, String, String> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+          @Override
+          public String getFirstKey(PageView pageView) {
+            return pageView.pageId;
+          }
 
-    Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
-    Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+          @Override
+          public String getSecondKey(AdClick adClick) {
+            return adClick.getPageId();
+          }
+        };
 
-    MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
-    MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+    MessageStream<PageView> repartitionedPageViews =
+        pageViews
+            .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+            .map(KV::getValue);
 
-    pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+    MessageStream<AdClick> repartitionedAdClicks =
+        adClicks
+            .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+            .map(KV::getValue);
 
-      @Override
-      public String apply(String pageViewMsg, String adClickMsg) {
-        PageView pageView = new PageView(pageViewMsg);
-        AdClick adClick = new AdClick(adClickMsg);
-        String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
-        return joinResult;
-      }
+    repartitionedPageViews
+        .join(repartitionedAdClicks, pageViewAdClickJoinFunction,
+            stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+        .sendTo(joinResults);
+  }
 
-      @Override
-      public String getFirstKey(String msg) {
-        return new PageView(msg).getPageId();
-      }
+  static class JoinResult {
+    public String pageId;
+    public String userId;
+    public String country;
+    public String adId;
 
-      @Override
-      public String getSecondKey(String msg) {
-        return new AdClick(msg).getPageId();
-      }
-    }, Duration.ofMinutes(3)).sendTo(outputStream);
+    public JoinResult(String pageId, String userId, String country, String adId) {
+      this.pageId = pageId;
+      this.userId = userId;
+      this.country = country;
+      this.adId = adId;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index cb39553..80ce2d1 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -20,14 +20,14 @@ package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Function;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
 
 /**
  * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
@@ -39,48 +39,41 @@ import java.util.function.Function;
  * <ol>
  *   <li>
  *     Ensure that the topic "pageview-filter-input" is created  <br/>
- *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
  *   </li>
  *   <li>
- *     Run the application using the ./bin/run-app.sh script <br/>
- *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- *     --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties
  *   </li>
  *   <li>
  *     Produce some messages to the "pageview-filter-input" topic <br/>
  *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
- *     user1,india,google.com <br/>
- *     user2,china,yahoo.com
+ *     {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ *     {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
  *   </li>
  *   <li>
  *     Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
- *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
- *     --property print.key=true    </li>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true
+ *   </li>
  * </ol>
- *
  */
 public class PageViewFilterApp implements StreamApplication {
 
-  private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
-  private static final String FILTER_KEY = "badKey";
   private static final String INPUT_TOPIC = "pageview-filter-input";
   private static final String OUTPUT_TOPIC = "pageview-filter-output";
+  private static final String INVALID_USER_ID = "invalidUserId";
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
-    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
-    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
-
-    OutputStream<String, String, String> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
-
-    FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+    MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+    OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
 
     pageViews
-        .partitionBy(keyFn)
-        .filter(filterFn)
-        .sendTo(outputStream);
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+        .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
+        .sendTo(filteredPageViews);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index 7ec4f9d..f1000ae 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -20,21 +20,22 @@ package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * In this example, we group page views by userId into sessions, and compute the number of page views for each user
- * session. A session is considered closed when there is no user activity for a 3 second duration.
+ * session. A session is considered closed when there is no user activity for a 10 second duration.
  *
  * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
  *
@@ -43,45 +44,50 @@ import java.util.function.Function;
  * <ol>
  *   <li>
  *     Ensure that the topic "pageview-session-input" is created  <br/>
- *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
  *   </li>
  *   <li>
- *     Run the application using the ./bin/run-app.sh script <br/>
- *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- *     --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties
  *   </li>
  *   <li>
  *     Produce some messages to the "pageview-session-input" topic <br/>
- *     user1,india,google.com <br/>
- *     user2,china,yahoo.com
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/>
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
  *   </li>
  *   <li>
  *     Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
- *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
- *     --property print.key=true
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true
  *   </li>
  * </ol>
  *
  */
 public class PageViewSessionizerApp implements StreamApplication {
 
-  private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
   private static final String INPUT_TOPIC = "pageview-session-input";
   private static final String OUTPUT_TOPIC = "pageview-session-output";
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
-    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
-    OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
-
-    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+    MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+    OutputStream<KV<String, UserPageViews>> userPageViews =
+        graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
 
     pageViews
-        .partitionBy(keyFn)
-        .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
-        .sendTo(outputStream);
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+        .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+        .map(windowPane -> {
+            String userId = windowPane.getKey().getKey();
+            int views = windowPane.getMessage().size();
+            return KV.of(userId, new UserPageViews(userId, views));
+          })
+        .sendTo(userPageViews);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 1bc6ff4..0809180 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -20,18 +20,18 @@ package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
@@ -46,45 +46,51 @@ import java.util.function.Function;
  * <ol>
  *   <li>
  *     Ensure that the topic "pageview-tumbling-input" is created  <br/>
- *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
  *   </li>
  *   <li>
- *     Run the application using the ./bin/run-app.sh script <br/>
- *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- *     --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties
  *   </li>
  *   <li>
- *     Produce some messages to the "pageview-tumbling-input" topic <br/>
+ *     Produce some messages to the "pageview-tumbling-input" topic, waiting for some time between messages <br/>
        ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
-       user1,india,google.com <br/>
- *     user2,china,yahoo.com
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ *     {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ *     {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
  *   </li>
  *   <li>
  *     Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
- *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
  *   </li>
  * </ol>
  *
  */
 public class TumblingPageViewCounterApp implements StreamApplication {
 
-  private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
   private static final String INPUT_TOPIC = "pageview-tumbling-input";
   private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
-    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
-    OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
-
-    Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+    MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+    OutputStream<KV<String, UserPageViews>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
 
     pageViews
-        .partitionBy(keyFn)
-        .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+        .window(Windows.keyedTumblingWindow(
+            kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+        .map(windowPane -> {
+            String userId = windowPane.getKey().getKey();
+            int views = windowPane.getMessage();
+            return KV.of(userId, new UserPageViews(userId, views));
+          })
         .sendTo(outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/AdClick.java b/src/main/java/samza/examples/cookbook/data/AdClick.java
new file mode 100644
index 0000000..42d45dc
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/AdClick.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook.data;
+
+/**
+ * An ad click event.
+ */
+public class AdClick {
+
+  private String pageId; // the unique id of the page that the ad was clicked on
+  private String adId; // an unique id for the ad
+  private String userId; // the user that clicked the ad
+
+  public String getPageId() {
+    return pageId;
+  }
+
+  public void setPageId(String pageId) {
+    this.pageId = pageId;
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public void setAdId(String adId) {
+    this.adId = adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java
new file mode 100644
index 0000000..9640694
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/PageView.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A page view event
+ */
+public class PageView {
+  public final String userId;
+  public final String country;
+  public final String pageId;
+
+  /**
+   * Constructs a page view event.
+   *
+   * @param pageId the id for the page that was viewed
+   * @param userId the user that viewed the page
+   * @param country the country that the page was viewed from
+   */
+  public PageView(
+      @JsonProperty("pageId") String pageId,
+      @JsonProperty("userId") String userId,
+      @JsonProperty("countryId") String country) {
+    this.userId = userId;
+    this.country = country;
+    this.pageId = pageId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/UserPageViews.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/UserPageViews.java b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
new file mode 100644
index 0000000..9e10a14
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * User page view count.
+ */
+public class UserPageViews {
+  private final String userId;
+  private final int count;
+
+  /**
+   * Constructs a user page view count.
+   *
+   * @param userId the id of the user viewing the pages
+   * @param count number of page views by the user
+   */
+  public UserPageViews(
+      @JsonProperty("userId") String userId,
+      @JsonProperty("count") int count) {
+    this.userId = userId;
+    this.count = count;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public int getCount() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index c320209..736d934 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -20,11 +20,6 @@
 package samza.examples.wikipedia.application;
 
 import com.google.common.collect.ImmutableList;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.Counter;
@@ -34,6 +29,8 @@ import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
@@ -41,6 +38,12 @@ import org.slf4j.LoggerFactory;
 import samza.examples.wikipedia.model.WikipediaParser;
 import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
 
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
@@ -82,46 +85,32 @@ public class WikipediaApplication implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
+    graph.setDefaultSerde(new NoOpSerde<>());
+
     // Inputs
     // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
-    // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
-    MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
-    MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
-    MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+    MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID);
+    MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID);
+    MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID);
 
-    // Output (also un-keyed, so no keyExtractor)
-    OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+    // Output (also un-keyed)
+    OutputStream<WikipediaStatsOutput> wikipediaStats =
+        graph.getOutputStream(STATS_STREAM_ID, new JsonSerdeV2<>(WikipediaStatsOutput.class));
 
     // Merge inputs
-    MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+    MessageStream<WikipediaFeedEvent> allWikipediaEvents =
+        MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
 
     // Parse, update stats, prepare output, and send
-    allWikipediaEvents.map(WikipediaParser::parseEvent)
+    allWikipediaEvents
+        .map(WikipediaParser::parseEvent)
         .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
         .map(this::formatOutput)
         .sendTo(wikipediaStats);
   }
 
   /**
-   * A few statistics about the incoming messages.
-   */
-  private static class WikipediaStats {
-    // Windowed stats
-    int edits = 0;
-    int byteDiff = 0;
-    Set<String> titles = new HashSet<String>();
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    // Total stats
-    int totalEdits = 0;
-
-    @Override
-    public String toString() {
-      return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
-    }
-  }
-
-  /**
    * Updates the windowed and total stats based on each "edit" event.
    *
    * Uses a KeyValueStore to persist a total edit count across restarts.
@@ -177,17 +166,46 @@ public class WikipediaApplication implements StreamApplication {
   /**
    * Format the stats for output to Kafka.
    */
-  private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
-
+  private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
     WikipediaStats stats = statsWindowPane.getMessage();
+    return new WikipediaStatsOutput(
+        stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
+  }
+
+  /**
+   * A few statistics about the incoming messages.
+   */
+  private static class WikipediaStats {
+    // Windowed stats
+    int edits = 0;
+    int byteDiff = 0;
+    Set<String> titles = new HashSet<>();
+    Map<String, Integer> counts = new HashMap<>();
+
+    // Total stats
+    int totalEdits = 0;
 
-    Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
-    counts.put("edits", stats.edits);
-    counts.put("edits-all-time", stats.totalEdits);
-    counts.put("bytes-added", stats.byteDiff);
-    counts.put("unique-titles", stats.titles.size());
+    @Override
+    public String toString() {
+      return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+    }
+  }
 
-    return counts;
+  static class WikipediaStatsOutput {
+    public int edits;
+    public int editsAllTime;
+    public int bytesAdded;
+    public int uniqueTitles;
+    public Map<String, Integer> counts;
+
+    public WikipediaStatsOutput(int edits, int editsAllTime, int bytesAdded, int uniqueTitles,
+        Map<String, Integer> counts) {
+      this.edits = edits;
+      this.editsAllTime = editsAllTime;
+      this.bytesAdded = bytesAdded;
+      this.uniqueTitles = uniqueTitles;
+      this.counts = counts;
+    }
   }
 }