You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:25:57 UTC

[16/33] samza-hello-samza git commit: SAMZA-1225: Add demo examples for programming with the fluent API

SAMZA-1225: Add demo examples for programming with the fluent API


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c87ed565
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c87ed565
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c87ed565

Branch: refs/heads/master
Commit: c87ed565fbaebf2ac88376143c65e9f52f7a8801
Parents: 4d20c2b
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Wed Apr 19 16:52:22 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Thu Apr 20 18:09:50 2017 -0700

----------------------------------------------------------------------
 bin/grid                                        |   4 +-
 gradle.properties                               |   2 +-
 pom.xml                                         |   6 +-
 src/main/assembly/src.xml                       |  20 ++++
 .../config/pageview-adclick-joiner.properties   |  46 ++++++++
 src/main/config/pageview-filter.properties      |  46 ++++++++
 src/main/config/pageview-sessionizer.properties |  46 ++++++++
 .../config/tumbling-pageview-counter.properties |  46 ++++++++
 .../java/samza/examples/cookbook/AdClick.java   |  58 ++++++++++
 .../java/samza/examples/cookbook/PageView.java  |  61 ++++++++++
 .../cookbook/PageViewAdClickJoiner.java         | 115 +++++++++++++++++++
 .../examples/cookbook/PageViewFilterApp.java    |  86 ++++++++++++++
 .../cookbook/PageViewSessionizerApp.java        |  87 ++++++++++++++
 .../cookbook/TumblingPageViewCounterApp.java    |  90 +++++++++++++++
 14 files changed, 707 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index ec9d210..7d2112b 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
@@ -96,7 +96,7 @@ install_yarn() {
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 1bc7633..294875b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,7 +18,7 @@
  */
 
 SAMZA_VERSION=0.13.0-SNAPSHOT
-KAFKA_VERSION=0.10.0.1
+KAFKA_VERSION=0.10.1.1
 HADOOP_VERSION=2.6.1
 
 SLF4J_VERSION = 1.7.7

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 100b2b1..9a0b54e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@ under the License.
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
-      <version>0.10.0.1</version>
+      <version>0.10.1.1</version>
     </dependency>
     <dependency>
       <groupId>org.schwering</groupId>
@@ -240,8 +240,8 @@ under the License.
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.1</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 3f2e4a8..e280a9a 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -51,6 +51,26 @@
       <outputDirectory>config</outputDirectory>
       <filtered>true</filtered>
     </file>
+    <file>
+      <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+    <file>
+      <source>${basedir}/src/main/config/pageview-sessionizer.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+    <file>
+      <source>${basedir}/src/main/config/pageview-filter.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+    <file>
+      <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
   </files>
   <dependencySets>
     <dependencySet>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
new file mode 100644
index 0000000..81ec3f6
--- /dev/null
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-adclick-joiner
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewAdClickJoiner
+task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
new file mode 100644
index 0000000..b9e8d2a
--- /dev/null
+++ b/src/main/config/pageview-filter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterApp
+task.inputs=kafka.pageview-filter-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
new file mode 100644
index 0000000..847aa87
--- /dev/null
+++ b/src/main/config/pageview-sessionizer.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-sessionizer
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewSessionizerApp
+task.inputs=kafka.pageview-session-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
new file mode 100644
index 0000000..09fb131
--- /dev/null
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=tumbling-pageview-counter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.TumblingPageViewCounterApp
+task.inputs=kafka.pageview-tumbling-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+  /*
+   * An unique identifier for the ad
+   */
+  private final String adId;
+  /**
+   * The user that clicked the ad
+   */
+  private final String userId;
+  /**
+   * The id of the page that the ad was served from
+   */
+  private final String pageId;
+
+  public AdClick(String message) {
+    String[] adClickFields = message.split(",");
+    this.adId = adClickFields[0];
+    this.userId = adClickFields[1];
+    this.pageId = adClickFields[2];
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+  /**
+   * The user that viewed the page
+   */
+  private final String userId;
+  /**
+   * The region that the page was viewed from
+   */
+  private final String country;
+  /**
+   * A trackingId for the page
+   */
+  private final String pageId;
+
+  /**
+   * Constructs a {@link PageView} from the provided string.
+   *
+   * @param message in the following CSV format - userId,country,url
+   */
+  PageView(String message) {
+    String[] pageViewFields = message.split(",");
+    userId = pageViewFields[0];
+    country = pageViewFields[1];
+    pageId = pageViewFields[2];
+  }
+
+  String getUserId() {
+    return userId;
+  }
+
+  String getCountry() {
+    return country;
+  }
+
+  String getPageId() {
+    return pageId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
new file mode 100644
index 0000000..94c7bc3
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+/**
+ * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
+ * analysis on what pages served an Ad that was clicked.
+ *
+ * <p> Concepts covered: Performing stream to stream Joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topics "pageview-join-input", "adclick-join-input" are created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-join-input" topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
+ *     adClickId1,user1,google.com <br/>
+ *     adClickId2,user1,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
+ *     --property print.key=true
+ *   </li>
+ * </ol>
+ *
+ */
+public class PageViewAdClickJoiner implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
+  private static final String INPUT_TOPIC1 = "pageview-join-input";
+  private static final String INPUT_TOPIC2 = "adclick-join-input";
+
+  private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
+    MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+
+    OutputStream<String, String, String> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+
+    Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
+    Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+
+    MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
+    MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+
+    pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+
+      @Override
+      public String apply(String pageViewMsg, String adClickMsg) {
+        PageView pageView = new PageView(pageViewMsg);
+        AdClick adClick = new AdClick(adClickMsg);
+        String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
+        return joinResult;
+      }
+
+      @Override
+      public String getFirstKey(String msg) {
+        return new PageView(msg).getPageId();
+      }
+
+      @Override
+      public String getSecondKey(String msg) {
+        return new AdClick(msg).getPageId();
+      }
+    }, Duration.ofMinutes(3)).sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
new file mode 100644
index 0000000..cb39553
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+/**
+ * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
+ *
+ * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-filter-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-filter-input" topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
+ *     --property print.key=true    </li>
+ * </ol>
+ *
+ */
+public class PageViewFilterApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
+  private static final String FILTER_KEY = "badKey";
+  private static final String INPUT_TOPIC = "pageview-filter-input";
+  private static final String OUTPUT_TOPIC = "pageview-filter-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+    OutputStream<String, String, String> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
+
+    FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+
+    pageViews
+        .partitionBy(keyFn)
+        .filter(filterFn)
+        .sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
new file mode 100644
index 0000000..7ec4f9d
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group page views by userId into sessions, and compute the number of page views for each user
+ * session. A session is considered closed when there is no user activity for a 3 second duration.
+ *
+ * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-session-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-session-input" topic <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
+ *     --property print.key=true
+ *   </li>
+ * </ol>
+ *
+ */
+public class PageViewSessionizerApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
+  private static final String INPUT_TOPIC = "pageview-session-input";
+  private static final String OUTPUT_TOPIC = "pageview-session-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+    pageViews
+        .partitionBy(keyFn)
+        .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+        .sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
new file mode 100644
index 0000000..1bc6ff4
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
+ * window.
+ *
+ * <p> Concepts covered: Performing Group-By style aggregations on tumbling time windows.
+ *
+ * <p> Tumbling windows divide a stream into a set of contiguous, fixed-sized, non-overlapping time intervals.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-tumbling-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-tumbling-input" topic <br/>
+       ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
+       user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ *   </li>
+ * </ol>
+ *
+ */
+public class TumblingPageViewCounterApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
+  private static final String INPUT_TOPIC = "pageview-tumbling-input";
+  private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+
+    pageViews
+        .partitionBy(keyFn)
+        .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+        .sendTo(outputStream);
+  }
+}