You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/10 00:17:09 UTC

[2/2] samza-hello-samza git commit: Merge branch 'latest' into 'master' for 0.13.0 release

Merge branch 'latest' into 'master' for 0.13.0 release

Author: Jacob Maes <jm...@linkedin.com>
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Author: Navina Ramesh <nr...@linkedin.com>
Author: Jacob Maes <ja...@gmail.com>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Yi Pan <ni...@gmail.com>
Author: Aleksandar Pejakovic <a....@levi9.com>
Author: Ken Gidley <kg...@yahoo.com>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Vishal Kuo <vi...@gmail.com>
Author: Branislav Cogic <b....@levi9.com>
Author: Stanislav Los <sl...@gmail.com>
Author: Steven Aerts <st...@gmail.com>
Author: Yan Fang <ya...@gmail.com>

Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Closes #21 from jmakes/master


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/1a34a83d
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/1a34a83d
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/1a34a83d

Branch: refs/heads/master
Commit: 1a34a83d132ffe0e6a4a08d3fc3646cd14d4e0ad
Parents: 4384cb0
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 9 17:16:43 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Jun 9 17:16:43 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 README.md                                       |   6 +-
 bin/grid                                        |  56 +-
 bin/merge-pull-request.py                       | 508 +++++++++++++++++++
 bin/run-wikipedia-zk-application.sh             |  30 ++
 build.gradle                                    |   2 +
 gradle.properties                               |   4 +-
 pom.xml                                         |  10 +-
 src/main/assembly/src.xml                       |  27 +-
 .../config/pageview-adclick-joiner.properties   |  46 ++
 src/main/config/pageview-filter.properties      |  46 ++
 src/main/config/pageview-sessionizer.properties |  46 ++
 .../config/tumbling-pageview-counter.properties |  46 ++
 ...ikipedia-application-local-runner.properties |  59 +++
 .../config/wikipedia-application.properties     |  67 +++
 src/main/config/wikipedia-parser.properties     |   6 -
 src/main/config/wikipedia-stats.properties      |  10 +-
 .../java/samza/examples/cookbook/AdClick.java   |  58 +++
 .../java/samza/examples/cookbook/PageView.java  |  61 +++
 .../cookbook/PageViewAdClickJoiner.java         | 115 +++++
 .../examples/cookbook/PageViewFilterApp.java    |  86 ++++
 .../cookbook/PageViewSessionizerApp.java        |  87 ++++
 .../cookbook/TumblingPageViewCounterApp.java    |  90 ++++
 .../application/WikipediaApplication.java       | 193 +++++++
 .../WikipediaZkLocalApplication.java            |  54 ++
 .../wikipedia/model/WikipediaParser.java        |  80 +++
 .../task/WikipediaParserStreamTask.java         |  57 +--
 .../task/WikipediaStatsStreamTask.java          |  24 +-
 src/main/resources/log4j.xml                    |  19 +-
 29 files changed, 1794 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ce6a..f31af00 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ deploy
 *.swp
 build/
 .gradle/
+state

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 0a71cf6..0f80e9e 100644
--- a/README.md
+++ b/README.md
@@ -3,12 +3,12 @@ hello-samza
 
 Hello Samza is a starter project for [Apache Samza](http://samza.apache.org/) jobs.
 
-Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) to get started.
+Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) to get started.
 
 ### Pull requests and questions
 
-[Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
+[Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
 
 ### Contribution
 
-To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) git repository does not support git pull request.
\ No newline at end of file
+To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) git repository does not support git pull request.

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 497fa06..5dff403 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,10 +35,16 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
+SERVICE_WAIT_TIMEOUT_SEC=20
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
 bootstrap() {
   echo "Bootstrapping the system..."
   stop_all
@@ -49,6 +55,16 @@ bootstrap() {
   exit 0
 }
 
+standalone() {
+  echo "Setting up the system..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all_without_yarn
+  start_all_without_yarn
+  exit 0
+}
+
 install_all() {
   $DIR/grid install samza
   $DIR/grid install zookeeper
@@ -56,6 +72,12 @@ install_all() {
   $DIR/grid install kafka
 }
 
+install_all_without_yarn() {
+  $DIR/grid install samza
+  $DIR/grid install zookeeper
+  $DIR/grid install kafka
+}
+
 install_samza() {
   mkdir -p "$DEPLOY_ROOT_DIR"
   if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
@@ -90,7 +112,7 @@ install_yarn() {
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
@@ -122,10 +144,16 @@ start_all() {
   $DIR/grid start kafka
 }
 
+start_all_without_yarn() {
+  $DIR/grid start zookeeper
+  $DIR/grid start kafka
+}
+
 start_zookeeper() {
   if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
     cd $DEPLOY_ROOT_DIR/$SYSTEM
     bin/zkServer.sh start
+    wait_for_service "zookeeper" $ZOOKEEPER_PORT
     cd - > /dev/null
   else
     echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
@@ -135,7 +163,9 @@ start_zookeeper() {
 start_yarn() {
   if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
     $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+    wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
     $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+    wait_for_service "nodemanager" $NODEMANAGER_PORT
   else
     echo 'YARN is not installed. Run: bin/grid install yarn'
   fi
@@ -147,11 +177,29 @@ start_kafka() {
     cd $DEPLOY_ROOT_DIR/$SYSTEM
     nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
     cd - > /dev/null
+    wait_for_service "kafka" $KAFKA_PORT
   else
     echo 'Kafka is not installed. Run: bin/grid install kafka'
   fi
 }
 
+wait_for_service() {
+  local SERVICE_NAME=$1
+  local PORT=$2
+  echo "Waiting for $SERVICE_NAME to start..."
+  local CURRENT_WAIT_TIME=0
+  until $(nc -w 1 localhost $PORT); do
+    printf '.'
+    sleep 1
+    if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+      printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+      exit 1
+    fi
+  done
+  printf '\n'
+  echo "$SERVICE_NAME has started";
+}
+
 stop_all() {
   $DIR/grid stop kafka
   $DIR/grid stop yarn
@@ -191,6 +239,9 @@ stop_kafka() {
 if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
   bootstrap
   exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+  standalone
+  exit 0
 elif (test -z "$COMMAND" && test -z "$SYSTEM") \
   || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
   echo
@@ -198,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \
   echo
   echo "  $ grid"
   echo "  $ grid bootstrap"
+  echo "  $ grid standalone"
   echo "  $ grid install [yarn|kafka|zookeeper|samza|all]"
   echo "  $ grid start [yarn|kafka|zookeeper|all]"
   echo "  $ grid stop [yarn|kafka|zookeeper|all]"

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/merge-pull-request.py
----------------------------------------------------------------------
diff --git a/bin/merge-pull-request.py b/bin/merge-pull-request.py
new file mode 100755
index 0000000..fc5979b
--- /dev/null
+++ b/bin/merge-pull-request.py
@@ -0,0 +1,508 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utility for creating well-formed pull request merges and pushing them to Apache. This script is a modified version
+of the one created by the Kafka project (https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py).
+
+  Usage: ./samza-merge-pr.py (see config env vars below)
+
+This utility assumes you already have local a samza git folder and that you have added remotes corresponding to both:
+  (i) the github apache samza mirror and
+  (ii) the apache samza git repo.
+
+Note:
+    This script has been borrowed from the Apache Kafka team that can be found here:
+    https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py
+    It has been modified for the Apache Samza project.
+"""
+
+import json
+import os
+import re
+import subprocess
+import sys
+import urllib2
+
+try:
+    import jira.client
+
+    JIRA_IMPORTED = True
+except ImportError:
+    JIRA_IMPORTED = False
+
+PROJECT_NAME = "samza-hello-samza"
+
+# JIRA project name
+CAPITALIZED_PROJECT_NAME = "samza".upper()
+
+# Remote name which points to the GitHub site
+PR_REMOTE_NAME = os.environ.get("PR_REMOTE_NAME", "samza-github")
+# Remote name which points to Apache git
+PUSH_REMOTE_NAME = os.environ.get("PUSH_REMOTE_NAME", "samza-apache")
+# ASF JIRA username
+JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "")
+# ASF JIRA password
+JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
+
+"""
+OAuth key used for issuing requests against the GitHub API. If this is not defined, then requests
+will be unauthenticated. You should only need to configure this if you find yourself regularly
+exceeding your IP's unauthenticated request rate limit. You can create an OAuth key at
+https://github.com/settings/tokens. This script only requires the "public_repo" scope.
+"""
+GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY", "")
+
+GITHUB_BASE = "https://github.com/apache/%s/pull" % (PROJECT_NAME)
+GITHUB_API_BASE = "https://api.github.com/repos/apache/%s" % (PROJECT_NAME)
+JIRA_BASE = "https://issues.apache.org/jira/browse"
+JIRA_API_BASE = "https://issues.apache.org/jira"
+# Prefix added to temporary branches
+TEMP_BRANCH_PREFIX = "PR_TOOL"
+
+
+def get_json(url):
+    try:
+        request = urllib2.Request(url)
+        if GITHUB_OAUTH_KEY:
+            request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY)
+        return json.load(urllib2.urlopen(request))
+    except urllib2.HTTPError as e:
+        if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0':
+            print "Exceeded the GitHub API rate limit; see the instructions in " + \
+                  "samza-merge-pr.py to configure an OAuth token for making authenticated " + \
+                  "GitHub requests."
+        else:
+            print "Unable to fetch URL, exiting: %s" % url
+        sys.exit(-1)
+
+
+def fail(msg):
+    print msg
+    clean_up()
+    sys.exit(-1)
+
+
+def run_cmd(cmd):
+    if isinstance(cmd, list):
+        # Add debugging information on the command being run.
+        print " ".join(cmd)
+        return subprocess.check_output(cmd)
+    else:
+        print cmd
+        return subprocess.check_output(cmd.split(" "))
+
+
+def continue_maybe(prompt):
+    result = raw_input("\n%s (y/n): " % prompt)
+    if result.lower() != "y":
+        fail("Okay, exiting")
+
+
+def clean_up():
+    if original_head != get_current_branch():
+        print "Restoring head pointer to %s" % original_head
+        run_cmd("git checkout %s" % original_head)
+
+    branches = run_cmd("git branch").replace(" ", "").split("\n")
+
+    for branch in filter(lambda x: x.startswith(TEMP_BRANCH_PREFIX), branches):
+        print "Deleting local branch %s" % branch
+        run_cmd("git branch -D %s" % branch)
+
+
+def get_current_branch():
+    return run_cmd("git rev-parse --abbrev-ref HEAD").replace("\n", "")
+
+def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
+    """
+    Merge the requested Pull Request number
+    Returns the Merge Hash
+
+    :param pr_num: Pull Request Number
+    :param target_ref:
+    :param title: Commit title
+    :param body:
+    :param pr_repo_desc:
+    :return:
+    """
+    pr_branch_name = "%s_MERGE_PR_%s" % (TEMP_BRANCH_PREFIX, pr_num)
+    target_branch_name = "%s_MERGE_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, target_ref.upper())
+    run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name))
+    run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
+    run_cmd("git checkout %s" % target_branch_name)
+
+    had_conflicts = False
+    try:
+        run_cmd(['git', 'merge', pr_branch_name, '--squash'])
+    except Exception as e:
+        msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
+        continue_maybe(msg)
+        msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
+        continue_maybe(msg)
+        had_conflicts = True
+
+    commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+                              '--pretty=format:%an <%ae>']).split("\n")
+    distinct_authors = sorted(set(commit_authors),
+                              key=lambda x: commit_authors.count(x), reverse=True)
+    primary_author = raw_input(
+        "Enter primary author in the format of \"name <email>\" [%s]: " %
+        distinct_authors[0])
+    if primary_author == "":
+        primary_author = distinct_authors[0]
+
+    reviewers = raw_input(
+        "Enter reviewers in the format of \"name1 <email1>, name2 <email2>\": ").strip()
+
+    commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+                       '--pretty=format:%h [%an] %s']).split("\n")
+
+    if len(commits) > 1:
+        result = raw_input("List pull request commits in squashed commit message? (y/n): ")
+        if result.lower() == "y":
+            should_list_commits = True
+        else:
+            should_list_commits = False
+    else:
+        should_list_commits = False
+
+    merge_message_flags = []
+
+    merge_message_flags += ["-m", title]
+    if body is not None:
+        # We remove @ symbols from the body to avoid triggering e-mails
+        # to people every time someone creates a public fork of the project.
+        merge_message_flags += ["-m", body.replace("@", "")]
+
+    authors = "\n".join(["Author: %s" % a for a in distinct_authors])
+
+    merge_message_flags += ["-m", authors]
+
+    if (reviewers != ""):
+        merge_message_flags += ["-m", "Reviewers: %s" % reviewers]
+
+    if had_conflicts:
+        committer_name = run_cmd("git config --get user.name").strip()
+        committer_email = run_cmd("git config --get user.email").strip()
+        message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
+            committer_name, committer_email)
+        merge_message_flags += ["-m", message]
+
+    # The string "Closes #%s" string is required for GitHub to correctly close the PR
+    close_line = "Closes #%s from %s" % (pr_num, pr_repo_desc)
+    if should_list_commits:
+        close_line += " and squashes the following commits:"
+    merge_message_flags += ["-m", close_line]
+
+    if should_list_commits:
+        merge_message_flags += ["-m", "\n".join(commits)]
+
+    run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)
+
+    continue_maybe("Merge complete (local ref %s). Push to %s?" % (
+        target_branch_name, PUSH_REMOTE_NAME))
+
+    try:
+        run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref))
+    except Exception as e:
+        clean_up()
+        fail("Exception while pushing: %s" % e)
+
+    merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8]
+    clean_up()
+    print("Pull request #%s merged!" % pr_num)
+    print("Merge hash: %s" % merge_hash)
+    return merge_hash
+
+
+def cherry_pick(pr_num, merge_hash, default_branch):
+    pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
+    if pick_ref == "":
+        pick_ref = default_branch
+
+    pick_branch_name = "%s_PICK_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, pick_ref.upper())
+
+    run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name))
+    run_cmd("git checkout %s" % pick_branch_name)
+
+    try:
+        run_cmd("git cherry-pick -sx %s" % merge_hash)
+    except Exception as e:
+        msg = "Error cherry-picking: %s\nWould you like to manually fix-up this merge?" % e
+        continue_maybe(msg)
+        msg = "Okay, please fix any conflicts and finish the cherry-pick. Finished?"
+        continue_maybe(msg)
+
+    continue_maybe("Pick complete (local ref %s). Push to %s?" % (
+        pick_branch_name, PUSH_REMOTE_NAME))
+
+    try:
+        run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref))
+    except Exception as e:
+        clean_up()
+        fail("Exception while pushing: %s" % e)
+
+    pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8]
+    clean_up()
+
+    print("Pull request #%s picked into %s!" % (pr_num, pick_ref))
+    print("Pick hash: %s" % pick_hash)
+    return pick_ref
+
+
+def fix_version_from_branch(branch, versions):
+    """
+    Returns fix-version from the branch names
+    :param branch:
+    :param versions: List of sorted (newest->oldest) list of unrelease versions (includes "master" branch)
+    :return: Returns "master" if the current branch is master. Otherwise, it returns the oldest branch version
+            that starts with "master". Else, it returns None.
+    """
+    if branch == "master":
+        if len(versions) > 0:
+            return versions[0]
+        else:
+            return None
+    else:
+        v = filter(lambda x: x.startswith(branch), versions)
+        if len(v) > 0:
+            return v[-1]
+        else:
+            print("Could not find branch %s in versions: %s" % (branch, versions))
+            return None
+
+
+def resolve_jira_issue(title, merge_branches, comment):
+    """
+    Updates the assignee (if not already provided) and fix-versions before resolving the JIRA
+
+    :param title: Standardized commit title
+    :param merge_branches: Default fix-versions for this JIRA
+    :param comment: Comment added to the JIRA
+    """
+    asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
+                                basic_auth=(JIRA_USERNAME, JIRA_PASSWORD))
+    default_jira_id = ""
+    jira_ids = re.findall("%s-[0-9]{4,5}" % CAPITALIZED_PROJECT_NAME, title)
+    if len(jira_ids) > 0:
+        default_jira_id = jira_ids[0]
+
+    jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
+    if jira_id == "":
+        jira_id = default_jira_id
+
+    try:
+        issue = asf_jira.issue(jira_id)
+    except Exception as e:
+        fail("ASF JIRA could not find %s\n%s" % (jira_id, e))
+
+    cur_status = issue.fields.status.name
+    cur_summary = issue.fields.summary
+    cur_assignee = issue.fields.assignee
+    if cur_assignee is None:
+        cur_assignee = "NOT ASSIGNED!!!"
+    else:
+        cur_assignee = cur_assignee.displayName
+
+    if cur_status == "Resolved" or cur_status == "Closed":
+        fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status))
+    print ("=== JIRA %s ===" % jira_id)
+    print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % (
+        cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id))
+
+    versions = asf_jira.project_versions(CAPITALIZED_PROJECT_NAME)
+    versions = sorted(versions, key=lambda x: x.name, reverse=True)
+    versions = filter(lambda x: x.raw['released'] is False, versions)
+
+    version_names = map(lambda x: x.name, versions)
+    default_fix_versions = map(lambda x: fix_version_from_branch(x, version_names), merge_branches)
+    default_fix_versions = filter(lambda x: x != None, default_fix_versions)
+    default_fix_versions = ",".join(default_fix_versions)
+
+    fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
+    if fix_versions == "":
+        fix_versions = default_fix_versions
+    fix_versions = fix_versions.replace(" ", "").split(",")
+
+    def get_version_json(version_str):
+        return filter(lambda v: v.name == version_str, versions)[0].raw
+
+    jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+
+    resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
+    resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
+    asf_jira.transition_issue(
+        jira_id, resolve["id"], fixVersions=jira_fix_versions,
+        comment=comment, resolution={'id': resolution.raw['id']})
+
+    print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
+
+
+def standardize_jira_ref(text):
+    """
+    Standardize the jira reference commit message prefix to "PROJECT_NAME-XXX: Issue"
+    >>> standardize_jira_ref("%s-5954: Top by key" % CAPITALIZED_PROJECT_NAME)
+    'SAMZA-5954: Top by key'
+    >>> standardize_jira_ref("%s-5821: ParquetRelation2 CTAS should check if delete is successful" % PROJECT_NAME)
+    'SAMZA-5821: ParquetRelation2 CTAS should check if delete is successful'
+    >>> standardize_jira_ref("%s-4123 [WIP] Show new dependencies added in pull requests" % PROJECT_NAME)
+    'SAMZA-4123: [WIP] Show new dependencies added in pull requests'
+    >>> standardize_jira_ref("%s  5954: Top by key" % PROJECT_NAME)
+    'SAMZA-5954: Top by key'
+    >>> standardize_jira_ref("%s-979 a LRU scheduler for load balancing in TaskSchedulerImpl" % PROJECT_NAME)
+    'SAMZA-979: a LRU scheduler for load balancing in TaskSchedulerImpl'
+    >>> standardize_jira_ref("%s-1094 Support MiMa for reporting binary compatibility across versions." % CAPITALIZED_PROJECT_NAME)
+    'SAMZA-1094: Support MiMa for reporting binary compatibility across versions.'
+    >>> standardize_jira_ref("[WIP] %s-1146: Vagrant support" % CAPITALIZED_PROJECT_NAME)
+    'SAMZA-1146: [WIP] Vagrant support'
+    >>> standardize_jira_ref("%s-1032. If Yarn app fails before registering, app master stays aroun..." % PROJECT_NAME)
+    'SAMZA-1032: If Yarn app fails before registering, app master stays aroun...'
+    >>> standardize_jira_ref("%s-6250 %s-6146 %s-5911: Types are now reserved words in DDL parser." % (PROJECT_NAME, PROJECT_NAME, CAPITALIZED_PROJECT_NAME))
+    'SAMZA-6250 SAMZA-6146 SAMZA-5911: Types are now reserved words in DDL parser.'
+    >>> standardize_jira_ref("Additional information for users building from source code")
+    'Additional information for users building from source code'
+
+    :param text: Text provided that will be used a commit message
+    :return: Standardized commit message that includes the JIRA number as well
+    """
+    jira_refs = []
+    components = []
+
+    # Extract JIRA ref(s):
+    pattern = re.compile(r'(%s[-\s]*[0-9]{3,6})+' % CAPITALIZED_PROJECT_NAME, re.IGNORECASE)
+    for ref in pattern.findall(text):
+        # Add brackets, replace spaces with a dash, & convert to uppercase
+        jira_refs.append(re.sub(r'\s+', '-', ref.upper()))
+        text = text.replace(ref, '')
+
+    # Extract project name component(s):
+    # Look for alphanumeric chars, spaces, dashes, periods, and/or commas
+    pattern = re.compile(r'(\[[\w\s,-\.]+\])', re.IGNORECASE)
+    for component in pattern.findall(text):
+        components.append(component.upper())
+        text = text.replace(component, '')
+
+    # Cleanup any remaining symbols:
+    pattern = re.compile(r'^\W+(.*)', re.IGNORECASE)
+    if (pattern.search(text) is not None):
+        text = pattern.search(text).groups()[0]
+
+    # Assemble full text (JIRA ref(s), module(s), remaining text)
+    jira_prefix = ' '.join(jira_refs).strip()
+    if jira_prefix:
+        jira_prefix = jira_prefix + ": "
+    clean_text = jira_prefix + ' '.join(components).strip() + " " + text.strip()
+
+    # Replace multiple spaces with a single space, e.g. if no jira refs and/or components were included
+    clean_text = re.sub(r'\s+', ' ', clean_text.strip())
+
+    return clean_text
+
+
+def main():
+    global original_head
+
+    original_head = get_current_branch()
+
+    latest_branch = "master"
+
+    pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ")
+    pr = get_json("%s/pulls/%s" % (GITHUB_API_BASE, pr_num))
+    pr_events = get_json("%s/issues/%s/events" % (GITHUB_API_BASE, pr_num))
+
+    url = pr["url"]
+
+    pr_title = pr["title"]
+    commit_title = raw_input("Commit title [%s]: " % pr_title.encode("utf-8")).decode("utf-8")
+    if commit_title == "":
+        commit_title = pr_title
+
+    # Decide whether to use the modified title or not
+    modified_title = standardize_jira_ref(commit_title)
+    if modified_title != commit_title:
+        print "I've re-written the title as follows to match the standard format:"
+        print "Original: %s" % commit_title
+        print "Modified: %s" % modified_title
+        result = raw_input("Would you like to use the modified title? (y/n): ")
+        if result.lower() == "y":
+            commit_title = modified_title
+            print "Using modified title:"
+        else:
+            print "Using original title:"
+        print commit_title
+
+    body = pr["body"]
+    target_ref = pr["base"]["ref"]
+    user_login = pr["user"]["login"]
+    base_ref = pr["head"]["ref"]
+    pr_repo_desc = "%s/%s" % (user_login, base_ref)
+
+    # Merged pull requests don't appear as merged in the GitHub API;
+    # Instead, they're closed by asfgit.
+    merge_commits = \
+        [e for e in pr_events if e["actor"]["login"] == "asfgit" and e["event"] == "closed"]
+
+    if merge_commits:
+        merge_hash = merge_commits[0]["commit_id"]
+        message = get_json("%s/commits/%s" % (GITHUB_API_BASE, merge_hash))["commit"]["message"]
+
+        print "Pull request %s has already been merged, assuming you want to backport" % pr_num
+        commit_is_downloaded = run_cmd(['git', 'rev-parse', '--quiet', '--verify',
+                                        "%s^{commit}" % merge_hash]).strip() != ""
+        if not commit_is_downloaded:
+            fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num)
+
+        print "Found commit %s:\n%s" % (merge_hash, message)
+        cherry_pick(pr_num, merge_hash, latest_branch)
+        sys.exit(0)
+
+    if not bool(pr["mergeable"]):
+        msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
+              "Continue? (experts only!)"
+        continue_maybe(msg)
+
+    print ("\n=== Pull Request #%s ===" % pr_num)
+    print ("PR title\t%s\nCommit title\t%s\nSource\t\t%s\nTarget\t\t%s\nURL\t\t%s" % (
+        pr_title, commit_title, pr_repo_desc, target_ref, url))
+    continue_maybe("Proceed with merging pull request #%s?" % pr_num)
+
+    merged_refs = [target_ref]
+
+    merge_hash = merge_pr(pr_num, target_ref, commit_title, body, pr_repo_desc)
+
+    pick_prompt = "Would you like to pick %s into another branch?" % merge_hash
+    while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
+        merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)]
+
+    if JIRA_IMPORTED:
+        if JIRA_USERNAME and JIRA_PASSWORD:
+            continue_maybe("Would you like to update an associated JIRA?")
+            jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num)
+            resolve_jira_issue(commit_title, merged_refs, jira_comment)
+        else:
+            print "JIRA_USERNAME and JIRA_PASSWORD not set"
+            print "Exiting without trying to close the associated JIRA."
+    else:
+        print "Could not find jira-python library. Run 'sudo pip install jira' to install."
+        print "Exiting without trying to close the associated JIRA."
+
+
+if __name__ == "__main__":
+    main()

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/run-wikipedia-zk-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-zk-application.sh b/bin/run-wikipedia-zk-application.sh
new file mode 100755
index 0000000..6feea52
--- /dev/null
+++ b/bin/run-wikipedia-zk-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 776ff36..ec451d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,6 +30,7 @@ task wrapper(type: Wrapper) {
 version = "$SAMZA_VERSION"
 
 repositories {
+    mavenLocal()
     mavenCentral()
     maven { url "https://repository.apache.org/content/groups/public" }
 }
@@ -70,6 +71,7 @@ task distTar(dependsOn: build, type: Tar) {
             include "wikipedia-feed.properties"
             include "wikipedia-parser.properties"
             include "wikipedia-stats.properties"
+            include "wikipedia-application.properties"
 
             // expand the Maven tokens with Gradle equivalents.  Also change 'target' (Maven) to 'build/distributions' (Gradle)
             filter { String line ->

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 8e36b98..8b71e61 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,8 +17,8 @@
  * under the License.
  */
 
-SAMZA_VERSION=0.12.0
-KAFKA_VERSION=0.10.0.1
+SAMZA_VERSION=0.13.0
+KAFKA_VERSION=0.10.1.1
 HADOOP_VERSION=2.6.1
 
 SLF4J_VERSION = 1.7.7

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6b02f6f..da7ec90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>0.12.0</version>
+  <version>0.13.0</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -81,7 +81,7 @@ under the License.
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
-      <version>0.10.0.1</version>
+      <version>0.10.1.1</version>
     </dependency>
     <dependency>
       <groupId>org.schwering</groupId>
@@ -143,7 +143,7 @@ under the License.
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.12.0</samza.version>
+    <samza.version>0.13.0</samza.version>
     <hadoop.version>2.6.1</hadoop.version>
   </properties>
 
@@ -240,8 +240,8 @@ under the License.
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.1</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 3f2e4a8..c04ace0 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -28,28 +28,25 @@
         <include>NOTICE*</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/config</directory>
+      <includes>
+        <include>*.properties</include>
+      </includes>
+      <outputDirectory>config</outputDirectory>
+      <!-- filtered=true, so we do variable expansion so the yarn package path
+      always points to the correct spot on any machine -->
+      <filtered>true</filtered>
+    </fileSet>
   </fileSets>
   <files>
     <file>
       <source>${basedir}/src/main/resources/log4j.xml</source>
       <outputDirectory>lib</outputDirectory>
     </file>
-    <!-- filtered=true, so we do variable expansion so the yarn package path
-      always points to the correct spot on any machine -->
     <file>
-      <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
+      <source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
+      <outputDirectory>bin</outputDirectory>
     </file>
   </files>
   <dependencySets>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
new file mode 100644
index 0000000..81ec3f6
--- /dev/null
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-adclick-joiner
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewAdClickJoiner
+task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
new file mode 100644
index 0000000..b9e8d2a
--- /dev/null
+++ b/src/main/config/pageview-filter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterApp
+task.inputs=kafka.pageview-filter-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
new file mode 100644
index 0000000..847aa87
--- /dev/null
+++ b/src/main/config/pageview-sessionizer.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-sessionizer
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewSessionizerApp
+task.inputs=kafka.pageview-session-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
new file mode 100644
index 0000000..09fb131
--- /dev/null
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=tumbling-pageview-counter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.TumblingPageViewCounterApp
+task.inputs=kafka.pageview-tumbling-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
new file mode 100644
index 0000000..1911e68
--- /dev/null
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+job.coordinator.zk.connect=localhost:2181
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
new file mode 100644
index 0000000..aeb8069
--- /dev/null
+++ b/src/main/config/wikipedia-application.properties
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Application / Job
+app.class=samza.examples.wikipedia.application.WikipediaApplication
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-application
+job.default.system=kafka
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Streams which are not on default system or have special characters in the physical name.
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index 6d1e3df..e8f3fa0 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -26,12 +26,6 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
 task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
 task.inputs=kafka.wikipedia-raw
 
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
 # Serializers
 serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index 13bab64..0a1cf31 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -26,10 +26,12 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
 task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
 task.inputs=kafka.wikipedia-edits
 task.window.ms=10000
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-# Normally, this would be 3, but we have only one broker.
-task.checkpoint.replication.factor=1
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
 
 # Serializers
 serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+  /*
+   * An unique identifier for the ad
+   */
+  private final String adId;
+  /**
+   * The user that clicked the ad
+   */
+  private final String userId;
+  /**
+   * The id of the page that the ad was served from
+   */
+  private final String pageId;
+
+  public AdClick(String message) {
+    String[] adClickFields = message.split(",");
+    this.adId = adClickFields[0];
+    this.userId = adClickFields[1];
+    this.pageId = adClickFields[2];
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+  /**
+   * The user that viewed the page
+   */
+  private final String userId;
+  /**
+   * The region that the page was viewed from
+   */
+  private final String country;
+  /**
+   * A trackingId for the page
+   */
+  private final String pageId;
+
+  /**
+   * Constructs a {@link PageView} from the provided string.
+   *
+   * @param message in the following CSV format - userId,country,url
+   */
+  PageView(String message) {
+    String[] pageViewFields = message.split(",");
+    userId = pageViewFields[0];
+    country = pageViewFields[1];
+    pageId = pageViewFields[2];
+  }
+
+  String getUserId() {
+    return userId;
+  }
+
+  String getCountry() {
+    return country;
+  }
+
+  String getPageId() {
+    return pageId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
new file mode 100644
index 0000000..94c7bc3
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+/**
+ * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
+ * analysis on what pages served an Ad that was clicked.
+ *
+ * <p> Concepts covered: Performing stream to stream Joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topics "pageview-join-input", "adclick-join-input" are created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-join-input" topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
+ *     adClickId1,user1,google.com <br/>
+ *     adClickId2,user1,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
+ *     --property print.key=true
+ *   </li>
+ * </ol>
+ *
+ */
+public class PageViewAdClickJoiner implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
+  private static final String INPUT_TOPIC1 = "pageview-join-input";
+  private static final String INPUT_TOPIC2 = "adclick-join-input";
+
+  private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
+    MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+
+    OutputStream<String, String, String> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+
+    Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
+    Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+
+    MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
+    MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+
+    pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+
+      @Override
+      public String apply(String pageViewMsg, String adClickMsg) {
+        PageView pageView = new PageView(pageViewMsg);
+        AdClick adClick = new AdClick(adClickMsg);
+        String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
+        return joinResult;
+      }
+
+      @Override
+      public String getFirstKey(String msg) {
+        return new PageView(msg).getPageId();
+      }
+
+      @Override
+      public String getSecondKey(String msg) {
+        return new AdClick(msg).getPageId();
+      }
+    }, Duration.ofMinutes(3)).sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
new file mode 100644
index 0000000..cb39553
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+/**
+ * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
+ *
+ * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-filter-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-filter-input" topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
+ *     --property print.key=true    </li>
+ * </ol>
+ *
+ */
+public class PageViewFilterApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
+  private static final String FILTER_KEY = "badKey";
+  private static final String INPUT_TOPIC = "pageview-filter-input";
+  private static final String OUTPUT_TOPIC = "pageview-filter-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+    OutputStream<String, String, String> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
+
+    FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+
+    pageViews
+        .partitionBy(keyFn)
+        .filter(filterFn)
+        .sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
new file mode 100644
index 0000000..7ec4f9d
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group page views by userId into sessions, and compute the number of page views for each user
+ * session. A session is considered closed when there is no user activity for a 3 second duration.
+ *
+ * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-session-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-session-input" topic <br/>
+ *     user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
+ *     --property print.key=true
+ *   </li>
+ * </ol>
+ *
+ */
+public class PageViewSessionizerApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
+  private static final String INPUT_TOPIC = "pageview-session-input";
+  private static final String OUTPUT_TOPIC = "pageview-session-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+    pageViews
+        .partitionBy(keyFn)
+        .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+        .sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
new file mode 100644
index 0000000..1bc6ff4
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
+ * window.
+ *
+ * <p> Concepts covered: Performing Group-By style aggregations on tumbling time windows.
+ *
+ * <p> Tumbling windows divide a stream into a set of contiguous, fixed-sized, non-overlapping time intervals.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "pageview-tumbling-input" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "pageview-tumbling-input" topic <br/>
+       ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
+       user1,india,google.com <br/>
+ *     user2,china,yahoo.com
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ *   </li>
+ * </ol>
+ *
+ */
+public class TumblingPageViewCounterApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
+  private static final String INPUT_TOPIC = "pageview-tumbling-input";
+  private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+    OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
+        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
+
+    Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+
+    pageViews
+        .partitionBy(keyFn)
+        .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+        .sendTo(outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
new file mode 100644
index 0000000..c320209
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import samza.examples.wikipedia.model.WikipediaParser;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+
+/**
+ * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
+ * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask},
+ * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and
+ * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression.
+ *
+ * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits"
+ * streams to connect the operators, as they are not needed with the fluent API.
+ *
+ * The application processes Wikipedia events in the following steps:
+ * <ul>
+ *   <li>Merge wikipedia, wiktionary, and wikinews events into one stream</li>
+ *   <li>Parse each event to a more structured format</li>
+ *   <li>Aggregate some stats over a 10s window</li>
+ *   <li>Format each window output for public consumption</li>
+ *   <li>Send the window output to Kafka</li>
+ * </ul>
+ *
+ * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which
+ * is invoked by the framework to load the application.
+ */
+public class WikipediaApplication implements StreamApplication {
+  private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class);
+
+  // Inputs
+  private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia";
+  private static final String WIKTIONARY_STREAM_ID = "en-wiktionary";
+  private static final String WIKINEWS_STREAM_ID = "en-wikinews";
+
+  // Outputs
+  private static final String STATS_STREAM_ID = "wikipedia-stats";
+
+  // Stores
+  private static final String STATS_STORE_NAME = "wikipedia-stats";
+
+  // Metrics
+  private static final String EDIT_COUNT_KEY = "count-edits-all-time";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    // Inputs
+    // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
+    // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
+    MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+    MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+    MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+
+    // Output (also un-keyed, so no keyExtractor)
+    OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+
+    // Merge inputs
+    MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+
+    // Parse, update stats, prepare output, and send
+    allWikipediaEvents.map(WikipediaParser::parseEvent)
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+        .map(this::formatOutput)
+        .sendTo(wikipediaStats);
+  }
+
+  /**
+   * A few statistics about the incoming messages.
+   */
+  private static class WikipediaStats {
+    // Windowed stats
+    int edits = 0;
+    int byteDiff = 0;
+    Set<String> titles = new HashSet<String>();
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    // Total stats
+    int totalEdits = 0;
+
+    @Override
+    public String toString() {
+      return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+    }
+  }
+
+  /**
+   * Updates the windowed and total stats based on each "edit" event.
+   *
+   * Uses a KeyValueStore to persist a total edit count across restarts.
+   */
+  private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> {
+
+    private KeyValueStore<String, Integer> store;
+
+    // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+    private Counter repeatEdits;
+
+    /**
+     * {@inheritDoc}
+     * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+     * get a KeyValueStore for persistence and the MetricsRegistry for metrics.
+     */
+    @Override
+    public void init(Config config, TaskContext context) {
+      store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME);
+      repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+    }
+
+    @Override
+    public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) {
+
+      // Update persisted total
+      Integer editsAllTime = store.get(EDIT_COUNT_KEY);
+      if (editsAllTime == null) editsAllTime = 0;
+      editsAllTime++;
+      store.put(EDIT_COUNT_KEY, editsAllTime);
+
+      // Update window stats
+      stats.edits++;
+      stats.totalEdits = editsAllTime;
+      stats.byteDiff += (Integer) edit.get("diff-bytes");
+      boolean newTitle = stats.titles.add((String) edit.get("title"));
+
+      Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+      for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+        if (Boolean.TRUE.equals(flag.getValue())) {
+          stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
+        }
+      }
+
+      if (!newTitle) {
+        repeatEdits.inc();
+        log.info("Frequent edits for title: {}", edit.get("title"));
+      }
+      return stats;
+    }
+  }
+
+  /**
+   * Format the stats for output to Kafka.
+   */
+  private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+
+    WikipediaStats stats = statsWindowPane.getMessage();
+
+    Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
+    counts.put("edits", stats.edits);
+    counts.put("edits-all-time", stats.totalEdits);
+    counts.put("bytes-added", stats.byteDiff);
+    counts.put("unique-titles", stats.titles.size());
+
+    return counts;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
new file mode 100644
index 0000000..51dd28f
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper.
+ * It waits for the job to finish; The job can also be ended by killing this process.
+ */
+public class WikipediaZkLocalApplication {
+
+  /**
+   * Executes the application using the local application runner.
+   * It takes two required command line arguments
+   *  config-factory: a fully {@link org.apache.samza.config.factories.PropertiesConfigFactory} class name
+   *  config-path: path to application properties
+   *
+   * @param args command line arguments
+   */
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    OptionSet options = cmdLine.parser().parse(args);
+    Config config = cmdLine.loadConfig(options);
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    WikipediaApplication app = new WikipediaApplication();
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+}