You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/10 00:17:09 UTC
[2/2] samza-hello-samza git commit: Merge branch 'latest' into
'master' for 0.13.0 release
Merge branch 'latest' into 'master' for 0.13.0 release
Author: Jacob Maes <jm...@linkedin.com>
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Author: Navina Ramesh <nr...@linkedin.com>
Author: Jacob Maes <ja...@gmail.com>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Yi Pan <ni...@gmail.com>
Author: Aleksandar Pejakovic <a....@levi9.com>
Author: Ken Gidley <kg...@yahoo.com>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Vishal Kuo <vi...@gmail.com>
Author: Branislav Cogic <b....@levi9.com>
Author: Stanislav Los <sl...@gmail.com>
Author: Steven Aerts <st...@gmail.com>
Author: Yan Fang <ya...@gmail.com>
Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Closes #21 from jmakes/master
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/1a34a83d
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/1a34a83d
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/1a34a83d
Branch: refs/heads/master
Commit: 1a34a83d132ffe0e6a4a08d3fc3646cd14d4e0ad
Parents: 4384cb0
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 9 17:16:43 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Jun 9 17:16:43 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
README.md | 6 +-
bin/grid | 56 +-
bin/merge-pull-request.py | 508 +++++++++++++++++++
bin/run-wikipedia-zk-application.sh | 30 ++
build.gradle | 2 +
gradle.properties | 4 +-
pom.xml | 10 +-
src/main/assembly/src.xml | 27 +-
.../config/pageview-adclick-joiner.properties | 46 ++
src/main/config/pageview-filter.properties | 46 ++
src/main/config/pageview-sessionizer.properties | 46 ++
.../config/tumbling-pageview-counter.properties | 46 ++
...ikipedia-application-local-runner.properties | 59 +++
.../config/wikipedia-application.properties | 67 +++
src/main/config/wikipedia-parser.properties | 6 -
src/main/config/wikipedia-stats.properties | 10 +-
.../java/samza/examples/cookbook/AdClick.java | 58 +++
.../java/samza/examples/cookbook/PageView.java | 61 +++
.../cookbook/PageViewAdClickJoiner.java | 115 +++++
.../examples/cookbook/PageViewFilterApp.java | 86 ++++
.../cookbook/PageViewSessionizerApp.java | 87 ++++
.../cookbook/TumblingPageViewCounterApp.java | 90 ++++
.../application/WikipediaApplication.java | 193 +++++++
.../WikipediaZkLocalApplication.java | 54 ++
.../wikipedia/model/WikipediaParser.java | 80 +++
.../task/WikipediaParserStreamTask.java | 57 +--
.../task/WikipediaStatsStreamTask.java | 24 +-
src/main/resources/log4j.xml | 19 +-
29 files changed, 1794 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ce6a..f31af00 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ deploy
*.swp
build/
.gradle/
+state
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 0a71cf6..0f80e9e 100644
--- a/README.md
+++ b/README.md
@@ -3,12 +3,12 @@ hello-samza
Hello Samza is a starter project for [Apache Samza](http://samza.apache.org/) jobs.
-Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) to get started.
+Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) to get started.
### Pull requests and questions
-[Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
+[Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
### Contribution
-To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) git repository does not support git pull request.
\ No newline at end of file
+To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) git repository does not support git pull request.
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 497fa06..5dff403 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,10 +35,16 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
+SERVICE_WAIT_TIMEOUT_SEC=20
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
bootstrap() {
echo "Bootstrapping the system..."
stop_all
@@ -49,6 +55,16 @@ bootstrap() {
exit 0
}
+standalone() {
+ echo "Setting up the system..."
+ stop_all
+ rm -rf "$DEPLOY_ROOT_DIR"
+ mkdir "$DEPLOY_ROOT_DIR"
+ install_all_without_yarn
+ start_all_without_yarn
+ exit 0
+}
+
install_all() {
$DIR/grid install samza
$DIR/grid install zookeeper
@@ -56,6 +72,12 @@ install_all() {
$DIR/grid install kafka
}
+install_all_without_yarn() {
+ $DIR/grid install samza
+ $DIR/grid install zookeeper
+ $DIR/grid install kafka
+}
+
install_samza() {
mkdir -p "$DEPLOY_ROOT_DIR"
if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
@@ -90,7 +112,7 @@ install_yarn() {
install_kafka() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1
+ install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
# have to use SIGTERM since nohup on appears to ignore SIGINT
# and Kafka switched to SIGINT in KAFKA-1031.
sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
@@ -122,10 +144,16 @@ start_all() {
$DIR/grid start kafka
}
+start_all_without_yarn() {
+ $DIR/grid start zookeeper
+ $DIR/grid start kafka
+}
+
start_zookeeper() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
bin/zkServer.sh start
+ wait_for_service "zookeeper" $ZOOKEEPER_PORT
cd - > /dev/null
else
echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
@@ -135,7 +163,9 @@ start_zookeeper() {
start_yarn() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
$DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+ wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
$DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+ wait_for_service "nodemanager" $NODEMANAGER_PORT
else
echo 'YARN is not installed. Run: bin/grid install yarn'
fi
@@ -147,11 +177,29 @@ start_kafka() {
cd $DEPLOY_ROOT_DIR/$SYSTEM
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
cd - > /dev/null
+ wait_for_service "kafka" $KAFKA_PORT
else
echo 'Kafka is not installed. Run: bin/grid install kafka'
fi
}
+wait_for_service() {
+ local SERVICE_NAME=$1
+ local PORT=$2
+ echo "Waiting for $SERVICE_NAME to start..."
+ local CURRENT_WAIT_TIME=0
+ until $(nc -w 1 localhost $PORT); do
+ printf '.'
+ sleep 1
+ if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+ printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+ exit 1
+ fi
+ done
+ printf '\n'
+ echo "$SERVICE_NAME has started";
+}
+
stop_all() {
$DIR/grid stop kafka
$DIR/grid stop yarn
@@ -191,6 +239,9 @@ stop_kafka() {
if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
bootstrap
exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+ standalone
+ exit 0
elif (test -z "$COMMAND" && test -z "$SYSTEM") \
|| ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
echo
@@ -198,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \
echo
echo " $ grid"
echo " $ grid bootstrap"
+ echo " $ grid standalone"
echo " $ grid install [yarn|kafka|zookeeper|samza|all]"
echo " $ grid start [yarn|kafka|zookeeper|all]"
echo " $ grid stop [yarn|kafka|zookeeper|all]"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/merge-pull-request.py
----------------------------------------------------------------------
diff --git a/bin/merge-pull-request.py b/bin/merge-pull-request.py
new file mode 100755
index 0000000..fc5979b
--- /dev/null
+++ b/bin/merge-pull-request.py
@@ -0,0 +1,508 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utility for creating well-formed pull request merges and pushing them to Apache. This script is a modified version
+of the one created by the Kafka project (https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py).
+
+ Usage: ./samza-merge-pr.py (see config env vars below)
+
+This utility assumes you already have local a samza git folder and that you have added remotes corresponding to both:
+ (i) the github apache samza mirror and
+ (ii) the apache samza git repo.
+
+Note:
+ This script has been borrowed from the Apache Kafka team that can be found here:
+ https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py
+ It has been modified for the Apache Samza project.
+"""
+
+import json
+import os
+import re
+import subprocess
+import sys
+import urllib2
+
+try:
+ import jira.client
+
+ JIRA_IMPORTED = True
+except ImportError:
+ JIRA_IMPORTED = False
+
+PROJECT_NAME = "samza-hello-samza"
+
+# JIRA project name
+CAPITALIZED_PROJECT_NAME = "samza".upper()
+
+# Remote name which points to the GitHub site
+PR_REMOTE_NAME = os.environ.get("PR_REMOTE_NAME", "samza-github")
+# Remote name which points to Apache git
+PUSH_REMOTE_NAME = os.environ.get("PUSH_REMOTE_NAME", "samza-apache")
+# ASF JIRA username
+JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "")
+# ASF JIRA password
+JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
+
+"""
+OAuth key used for issuing requests against the GitHub API. If this is not defined, then requests
+will be unauthenticated. You should only need to configure this if you find yourself regularly
+exceeding your IP's unauthenticated request rate limit. You can create an OAuth key at
+https://github.com/settings/tokens. This script only requires the "public_repo" scope.
+"""
+GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY", "")
+
+GITHUB_BASE = "https://github.com/apache/%s/pull" % (PROJECT_NAME)
+GITHUB_API_BASE = "https://api.github.com/repos/apache/%s" % (PROJECT_NAME)
+JIRA_BASE = "https://issues.apache.org/jira/browse"
+JIRA_API_BASE = "https://issues.apache.org/jira"
+# Prefix added to temporary branches
+TEMP_BRANCH_PREFIX = "PR_TOOL"
+
+
+def get_json(url):
+ try:
+ request = urllib2.Request(url)
+ if GITHUB_OAUTH_KEY:
+ request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY)
+ return json.load(urllib2.urlopen(request))
+ except urllib2.HTTPError as e:
+ if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0':
+ print "Exceeded the GitHub API rate limit; see the instructions in " + \
+ "samza-merge-pr.py to configure an OAuth token for making authenticated " + \
+ "GitHub requests."
+ else:
+ print "Unable to fetch URL, exiting: %s" % url
+ sys.exit(-1)
+
+
+def fail(msg):
+ print msg
+ clean_up()
+ sys.exit(-1)
+
+
+def run_cmd(cmd):
+ if isinstance(cmd, list):
+ # Add debugging information on the command being run.
+ print " ".join(cmd)
+ return subprocess.check_output(cmd)
+ else:
+ print cmd
+ return subprocess.check_output(cmd.split(" "))
+
+
+def continue_maybe(prompt):
+ result = raw_input("\n%s (y/n): " % prompt)
+ if result.lower() != "y":
+ fail("Okay, exiting")
+
+
+def clean_up():
+ if original_head != get_current_branch():
+ print "Restoring head pointer to %s" % original_head
+ run_cmd("git checkout %s" % original_head)
+
+ branches = run_cmd("git branch").replace(" ", "").split("\n")
+
+ for branch in filter(lambda x: x.startswith(TEMP_BRANCH_PREFIX), branches):
+ print "Deleting local branch %s" % branch
+ run_cmd("git branch -D %s" % branch)
+
+
+def get_current_branch():
+ return run_cmd("git rev-parse --abbrev-ref HEAD").replace("\n", "")
+
+def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
+ """
+ Merge the requested Pull Request number
+ Returns the Merge Hash
+
+ :param pr_num: Pull Request Number
+ :param target_ref:
+ :param title: Commit title
+ :param body:
+ :param pr_repo_desc:
+ :return:
+ """
+ pr_branch_name = "%s_MERGE_PR_%s" % (TEMP_BRANCH_PREFIX, pr_num)
+ target_branch_name = "%s_MERGE_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, target_ref.upper())
+ run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name))
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
+ run_cmd("git checkout %s" % target_branch_name)
+
+ had_conflicts = False
+ try:
+ run_cmd(['git', 'merge', pr_branch_name, '--squash'])
+ except Exception as e:
+ msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
+ continue_maybe(msg)
+ msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
+ continue_maybe(msg)
+ had_conflicts = True
+
+ commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%an <%ae>']).split("\n")
+ distinct_authors = sorted(set(commit_authors),
+ key=lambda x: commit_authors.count(x), reverse=True)
+ primary_author = raw_input(
+ "Enter primary author in the format of \"name <email>\" [%s]: " %
+ distinct_authors[0])
+ if primary_author == "":
+ primary_author = distinct_authors[0]
+
+ reviewers = raw_input(
+ "Enter reviewers in the format of \"name1 <email1>, name2 <email2>\": ").strip()
+
+ commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%h [%an] %s']).split("\n")
+
+ if len(commits) > 1:
+ result = raw_input("List pull request commits in squashed commit message? (y/n): ")
+ if result.lower() == "y":
+ should_list_commits = True
+ else:
+ should_list_commits = False
+ else:
+ should_list_commits = False
+
+ merge_message_flags = []
+
+ merge_message_flags += ["-m", title]
+ if body is not None:
+ # We remove @ symbols from the body to avoid triggering e-mails
+ # to people every time someone creates a public fork of the project.
+ merge_message_flags += ["-m", body.replace("@", "")]
+
+ authors = "\n".join(["Author: %s" % a for a in distinct_authors])
+
+ merge_message_flags += ["-m", authors]
+
+ if (reviewers != ""):
+ merge_message_flags += ["-m", "Reviewers: %s" % reviewers]
+
+ if had_conflicts:
+ committer_name = run_cmd("git config --get user.name").strip()
+ committer_email = run_cmd("git config --get user.email").strip()
+ message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
+ committer_name, committer_email)
+ merge_message_flags += ["-m", message]
+
+ # The string "Closes #%s" string is required for GitHub to correctly close the PR
+ close_line = "Closes #%s from %s" % (pr_num, pr_repo_desc)
+ if should_list_commits:
+ close_line += " and squashes the following commits:"
+ merge_message_flags += ["-m", close_line]
+
+ if should_list_commits:
+ merge_message_flags += ["-m", "\n".join(commits)]
+
+ run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)
+
+ continue_maybe("Merge complete (local ref %s). Push to %s?" % (
+ target_branch_name, PUSH_REMOTE_NAME))
+
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
+
+ merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8]
+ clean_up()
+ print("Pull request #%s merged!" % pr_num)
+ print("Merge hash: %s" % merge_hash)
+ return merge_hash
+
+
+def cherry_pick(pr_num, merge_hash, default_branch):
+ pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
+ if pick_ref == "":
+ pick_ref = default_branch
+
+ pick_branch_name = "%s_PICK_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, pick_ref.upper())
+
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name))
+ run_cmd("git checkout %s" % pick_branch_name)
+
+ try:
+ run_cmd("git cherry-pick -sx %s" % merge_hash)
+ except Exception as e:
+ msg = "Error cherry-picking: %s\nWould you like to manually fix-up this merge?" % e
+ continue_maybe(msg)
+ msg = "Okay, please fix any conflicts and finish the cherry-pick. Finished?"
+ continue_maybe(msg)
+
+ continue_maybe("Pick complete (local ref %s). Push to %s?" % (
+ pick_branch_name, PUSH_REMOTE_NAME))
+
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
+
+ pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8]
+ clean_up()
+
+ print("Pull request #%s picked into %s!" % (pr_num, pick_ref))
+ print("Pick hash: %s" % pick_hash)
+ return pick_ref
+
+
+def fix_version_from_branch(branch, versions):
+ """
+ Returns fix-version from the branch names
+ :param branch:
+ :param versions: List of sorted (newest->oldest) list of unrelease versions (includes "master" branch)
+ :return: Returns "master" if the current branch is master. Otherwise, it returns the oldest branch version
+ that starts with "master". Else, it returns None.
+ """
+ if branch == "master":
+ if len(versions) > 0:
+ return versions[0]
+ else:
+ return None
+ else:
+ v = filter(lambda x: x.startswith(branch), versions)
+ if len(v) > 0:
+ return v[-1]
+ else:
+ print("Could not find branch %s in versions: %s" % (branch, versions))
+ return None
+
+
+def resolve_jira_issue(title, merge_branches, comment):
+ """
+ Updates the assignee (if not already provided) and fix-versions before resolving the JIRA
+
+ :param title: Standardized commit title
+ :param merge_branches: Default fix-versions for this JIRA
+ :param comment: Comment added to the JIRA
+ """
+ asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
+ basic_auth=(JIRA_USERNAME, JIRA_PASSWORD))
+ default_jira_id = ""
+ jira_ids = re.findall("%s-[0-9]{4,5}" % CAPITALIZED_PROJECT_NAME, title)
+ if len(jira_ids) > 0:
+ default_jira_id = jira_ids[0]
+
+ jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
+ if jira_id == "":
+ jira_id = default_jira_id
+
+ try:
+ issue = asf_jira.issue(jira_id)
+ except Exception as e:
+ fail("ASF JIRA could not find %s\n%s" % (jira_id, e))
+
+ cur_status = issue.fields.status.name
+ cur_summary = issue.fields.summary
+ cur_assignee = issue.fields.assignee
+ if cur_assignee is None:
+ cur_assignee = "NOT ASSIGNED!!!"
+ else:
+ cur_assignee = cur_assignee.displayName
+
+ if cur_status == "Resolved" or cur_status == "Closed":
+ fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status))
+ print ("=== JIRA %s ===" % jira_id)
+ print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % (
+ cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id))
+
+ versions = asf_jira.project_versions(CAPITALIZED_PROJECT_NAME)
+ versions = sorted(versions, key=lambda x: x.name, reverse=True)
+ versions = filter(lambda x: x.raw['released'] is False, versions)
+
+ version_names = map(lambda x: x.name, versions)
+ default_fix_versions = map(lambda x: fix_version_from_branch(x, version_names), merge_branches)
+ default_fix_versions = filter(lambda x: x != None, default_fix_versions)
+ default_fix_versions = ",".join(default_fix_versions)
+
+ fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
+ if fix_versions == "":
+ fix_versions = default_fix_versions
+ fix_versions = fix_versions.replace(" ", "").split(",")
+
+ def get_version_json(version_str):
+ return filter(lambda v: v.name == version_str, versions)[0].raw
+
+ jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+
+ resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
+ resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
+ asf_jira.transition_issue(
+ jira_id, resolve["id"], fixVersions=jira_fix_versions,
+ comment=comment, resolution={'id': resolution.raw['id']})
+
+ print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
+
+
+def standardize_jira_ref(text):
+ """
+ Standardize the jira reference commit message prefix to "PROJECT_NAME-XXX: Issue"
+ >>> standardize_jira_ref("%s-5954: Top by key" % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-5954: Top by key'
+ >>> standardize_jira_ref("%s-5821: ParquetRelation2 CTAS should check if delete is successful" % PROJECT_NAME)
+ 'SAMZA-5821: ParquetRelation2 CTAS should check if delete is successful'
+ >>> standardize_jira_ref("%s-4123 [WIP] Show new dependencies added in pull requests" % PROJECT_NAME)
+ 'SAMZA-4123: [WIP] Show new dependencies added in pull requests'
+ >>> standardize_jira_ref("%s 5954: Top by key" % PROJECT_NAME)
+ 'SAMZA-5954: Top by key'
+ >>> standardize_jira_ref("%s-979 a LRU scheduler for load balancing in TaskSchedulerImpl" % PROJECT_NAME)
+ 'SAMZA-979: a LRU scheduler for load balancing in TaskSchedulerImpl'
+ >>> standardize_jira_ref("%s-1094 Support MiMa for reporting binary compatibility across versions." % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-1094: Support MiMa for reporting binary compatibility across versions.'
+ >>> standardize_jira_ref("[WIP] %s-1146: Vagrant support" % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-1146: [WIP] Vagrant support'
+ >>> standardize_jira_ref("%s-1032. If Yarn app fails before registering, app master stays aroun..." % PROJECT_NAME)
+ 'SAMZA-1032: If Yarn app fails before registering, app master stays aroun...'
+ >>> standardize_jira_ref("%s-6250 %s-6146 %s-5911: Types are now reserved words in DDL parser." % (PROJECT_NAME, PROJECT_NAME, CAPITALIZED_PROJECT_NAME))
+ 'SAMZA-6250 SAMZA-6146 SAMZA-5911: Types are now reserved words in DDL parser.'
+ >>> standardize_jira_ref("Additional information for users building from source code")
+ 'Additional information for users building from source code'
+
+ :param text: Text provided that will be used a commit message
+ :return: Standardized commit message that includes the JIRA number as well
+ """
+ jira_refs = []
+ components = []
+
+ # Extract JIRA ref(s):
+ pattern = re.compile(r'(%s[-\s]*[0-9]{3,6})+' % CAPITALIZED_PROJECT_NAME, re.IGNORECASE)
+ for ref in pattern.findall(text):
+ # Add brackets, replace spaces with a dash, & convert to uppercase
+ jira_refs.append(re.sub(r'\s+', '-', ref.upper()))
+ text = text.replace(ref, '')
+
+ # Extract project name component(s):
+ # Look for alphanumeric chars, spaces, dashes, periods, and/or commas
+ pattern = re.compile(r'(\[[\w\s,-\.]+\])', re.IGNORECASE)
+ for component in pattern.findall(text):
+ components.append(component.upper())
+ text = text.replace(component, '')
+
+ # Cleanup any remaining symbols:
+ pattern = re.compile(r'^\W+(.*)', re.IGNORECASE)
+ if (pattern.search(text) is not None):
+ text = pattern.search(text).groups()[0]
+
+ # Assemble full text (JIRA ref(s), module(s), remaining text)
+ jira_prefix = ' '.join(jira_refs).strip()
+ if jira_prefix:
+ jira_prefix = jira_prefix + ": "
+ clean_text = jira_prefix + ' '.join(components).strip() + " " + text.strip()
+
+ # Replace multiple spaces with a single space, e.g. if no jira refs and/or components were included
+ clean_text = re.sub(r'\s+', ' ', clean_text.strip())
+
+ return clean_text
+
+
+def main():
+ global original_head
+
+ original_head = get_current_branch()
+
+ latest_branch = "master"
+
+ pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ")
+ pr = get_json("%s/pulls/%s" % (GITHUB_API_BASE, pr_num))
+ pr_events = get_json("%s/issues/%s/events" % (GITHUB_API_BASE, pr_num))
+
+ url = pr["url"]
+
+ pr_title = pr["title"]
+ commit_title = raw_input("Commit title [%s]: " % pr_title.encode("utf-8")).decode("utf-8")
+ if commit_title == "":
+ commit_title = pr_title
+
+ # Decide whether to use the modified title or not
+ modified_title = standardize_jira_ref(commit_title)
+ if modified_title != commit_title:
+ print "I've re-written the title as follows to match the standard format:"
+ print "Original: %s" % commit_title
+ print "Modified: %s" % modified_title
+ result = raw_input("Would you like to use the modified title? (y/n): ")
+ if result.lower() == "y":
+ commit_title = modified_title
+ print "Using modified title:"
+ else:
+ print "Using original title:"
+ print commit_title
+
+ body = pr["body"]
+ target_ref = pr["base"]["ref"]
+ user_login = pr["user"]["login"]
+ base_ref = pr["head"]["ref"]
+ pr_repo_desc = "%s/%s" % (user_login, base_ref)
+
+ # Merged pull requests don't appear as merged in the GitHub API;
+ # Instead, they're closed by asfgit.
+ merge_commits = \
+ [e for e in pr_events if e["actor"]["login"] == "asfgit" and e["event"] == "closed"]
+
+ if merge_commits:
+ merge_hash = merge_commits[0]["commit_id"]
+ message = get_json("%s/commits/%s" % (GITHUB_API_BASE, merge_hash))["commit"]["message"]
+
+ print "Pull request %s has already been merged, assuming you want to backport" % pr_num
+ commit_is_downloaded = run_cmd(['git', 'rev-parse', '--quiet', '--verify',
+ "%s^{commit}" % merge_hash]).strip() != ""
+ if not commit_is_downloaded:
+ fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num)
+
+ print "Found commit %s:\n%s" % (merge_hash, message)
+ cherry_pick(pr_num, merge_hash, latest_branch)
+ sys.exit(0)
+
+ if not bool(pr["mergeable"]):
+ msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
+ "Continue? (experts only!)"
+ continue_maybe(msg)
+
+ print ("\n=== Pull Request #%s ===" % pr_num)
+ print ("PR title\t%s\nCommit title\t%s\nSource\t\t%s\nTarget\t\t%s\nURL\t\t%s" % (
+ pr_title, commit_title, pr_repo_desc, target_ref, url))
+ continue_maybe("Proceed with merging pull request #%s?" % pr_num)
+
+ merged_refs = [target_ref]
+
+ merge_hash = merge_pr(pr_num, target_ref, commit_title, body, pr_repo_desc)
+
+ pick_prompt = "Would you like to pick %s into another branch?" % merge_hash
+ while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
+ merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)]
+
+ if JIRA_IMPORTED:
+ if JIRA_USERNAME and JIRA_PASSWORD:
+ continue_maybe("Would you like to update an associated JIRA?")
+ jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num)
+ resolve_jira_issue(commit_title, merged_refs, jira_comment)
+ else:
+ print "JIRA_USERNAME and JIRA_PASSWORD not set"
+ print "Exiting without trying to close the associated JIRA."
+ else:
+ print "Could not find jira-python library. Run 'sudo pip install jira' to install."
+ print "Exiting without trying to close the associated JIRA."
+
+
+if __name__ == "__main__":
+ main()
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/bin/run-wikipedia-zk-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-zk-application.sh b/bin/run-wikipedia-zk-application.sh
new file mode 100755
index 0000000..6feea52
--- /dev/null
+++ b/bin/run-wikipedia-zk-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 776ff36..ec451d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,6 +30,7 @@ task wrapper(type: Wrapper) {
version = "$SAMZA_VERSION"
repositories {
+ mavenLocal()
mavenCentral()
maven { url "https://repository.apache.org/content/groups/public" }
}
@@ -70,6 +71,7 @@ task distTar(dependsOn: build, type: Tar) {
include "wikipedia-feed.properties"
include "wikipedia-parser.properties"
include "wikipedia-stats.properties"
+ include "wikipedia-application.properties"
// expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle)
filter { String line ->
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 8e36b98..8b71e61 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,8 +17,8 @@
* under the License.
*/
-SAMZA_VERSION=0.12.0
-KAFKA_VERSION=0.10.0.1
+SAMZA_VERSION=0.13.0
+KAFKA_VERSION=0.10.1.1
HADOOP_VERSION=2.6.1
SLF4J_VERSION = 1.7.7
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6b02f6f..da7ec90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.12.0</version>
+ <version>0.13.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -81,7 +81,7 @@ under the License.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
- <version>0.10.0.1</version>
+ <version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
@@ -143,7 +143,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.12.0</samza.version>
+ <samza.version>0.13.0</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
@@ -240,8 +240,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 3f2e4a8..c04ace0 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -28,28 +28,25 @@
<include>NOTICE*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/config</directory>
+ <includes>
+ <include>*.properties</include>
+ </includes>
+ <outputDirectory>config</outputDirectory>
+ <!-- filtered=true, so we do variable expansion so the yarn package path
+ always points to the correct spot on any machine -->
+ <filtered>true</filtered>
+ </fileSet>
</fileSets>
<files>
<file>
<source>${basedir}/src/main/resources/log4j.xml</source>
<outputDirectory>lib</outputDirectory>
</file>
- <!-- filtered=true, so we do variable expansion so the yarn package path
- always points to the correct spot on any machine -->
<file>
- <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
+ <source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
+ <outputDirectory>bin</outputDirectory>
</file>
</files>
<dependencySets>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
new file mode 100644
index 0000000..81ec3f6
--- /dev/null
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-adclick-joiner
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewAdClickJoiner
+task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
new file mode 100644
index 0000000..b9e8d2a
--- /dev/null
+++ b/src/main/config/pageview-filter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterApp
+task.inputs=kafka.pageview-filter-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
new file mode 100644
index 0000000..847aa87
--- /dev/null
+++ b/src/main/config/pageview-sessionizer.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-sessionizer
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewSessionizerApp
+task.inputs=kafka.pageview-session-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
new file mode 100644
index 0000000..09fb131
--- /dev/null
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=tumbling-pageview-counter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.TumblingPageViewCounterApp
+task.inputs=kafka.pageview-tumbling-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
new file mode 100644
index 0000000..1911e68
--- /dev/null
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+job.coordinator.zk.connect=localhost:2181
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
new file mode 100644
index 0000000..aeb8069
--- /dev/null
+++ b/src/main/config/wikipedia-application.properties
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Application / Job
+app.class=samza.examples.wikipedia.application.WikipediaApplication
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-application
+job.default.system=kafka
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Streams which are not on default system or have special characters in the physical name.
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index 6d1e3df..e8f3fa0 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -26,12 +26,6 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
task.inputs=kafka.wikipedia-raw
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index 13bab64..0a1cf31 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -26,10 +26,12 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
task.inputs=kafka.wikipedia-edits
task.window.ms=10000
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-# Normally, this would be 3, but we have only one broker.
-task.checkpoint.replication.factor=1
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+ /*
+ * An unique identifier for the ad
+ */
+ private final String adId;
+ /**
+ * The user that clicked the ad
+ */
+ private final String userId;
+ /**
+ * The id of the page that the ad was served from
+ */
+ private final String pageId;
+
+ public AdClick(String message) {
+ String[] adClickFields = message.split(",");
+ this.adId = adClickFields[0];
+ this.userId = adClickFields[1];
+ this.pageId = adClickFields[2];
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getPageId() {
+ return pageId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+ /**
+ * The user that viewed the page
+ */
+ private final String userId;
+ /**
+ * The region that the page was viewed from
+ */
+ private final String country;
+ /**
+ * A trackingId for the page
+ */
+ private final String pageId;
+
+ /**
+ * Constructs a {@link PageView} from the provided string.
+ *
+ * @param message in the following CSV format - userId,country,url
+ */
+ PageView(String message) {
+ String[] pageViewFields = message.split(",");
+ userId = pageViewFields[0];
+ country = pageViewFields[1];
+ pageId = pageViewFields[2];
+ }
+
+ String getUserId() {
+ return userId;
+ }
+
+ String getCountry() {
+ return country;
+ }
+
+ String getPageId() {
+ return pageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
new file mode 100644
index 0000000..94c7bc3
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+/**
+ * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
+ * analysis on what pages served an Ad that was clicked.
+ *
+ * <p> Concepts covered: Performing stream to stream Joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-join-input" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
+ * adClickId1,user1,google.com <br/>
+ * adClickId2,user1,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
+ * --property print.key=true
+ * </li>
+ * </ol>
+ *
+ */
+public class PageViewAdClickJoiner implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
+ private static final String INPUT_TOPIC1 = "pageview-join-input";
+ private static final String INPUT_TOPIC2 = "adclick-join-input";
+
+ private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
+ MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+
+ OutputStream<String, String, String> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+
+ Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
+ Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+
+ MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
+ MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+
+ pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+
+ @Override
+ public String apply(String pageViewMsg, String adClickMsg) {
+ PageView pageView = new PageView(pageViewMsg);
+ AdClick adClick = new AdClick(adClickMsg);
+ String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
+ return joinResult;
+ }
+
+ @Override
+ public String getFirstKey(String msg) {
+ return new PageView(msg).getPageId();
+ }
+
+ @Override
+ public String getSecondKey(String msg) {
+ return new AdClick(msg).getPageId();
+ }
+ }, Duration.ofMinutes(3)).sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
new file mode 100644
index 0000000..cb39553
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+/**
+ * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
+ *
+ * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-filter-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-filter-input" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
+ * --property print.key=true </li>
+ * </ol>
+ *
+ */
+public class PageViewFilterApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
+ private static final String FILTER_KEY = "badKey";
+ private static final String INPUT_TOPIC = "pageview-filter-input";
+ private static final String OUTPUT_TOPIC = "pageview-filter-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+ OutputStream<String, String, String> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
+
+ FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+
+ pageViews
+ .partitionBy(keyFn)
+ .filter(filterFn)
+ .sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
new file mode 100644
index 0000000..7ec4f9d
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group page views by userId into sessions, and compute the number of page views for each user
+ * session. A session is considered closed when there is no user activity for a 3 second duration.
+ *
+ * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-session-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-session-input" topic <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
+ * --property print.key=true
+ * </li>
+ * </ol>
+ *
+ */
+public class PageViewSessionizerApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
+ private static final String INPUT_TOPIC = "pageview-session-input";
+ private static final String OUTPUT_TOPIC = "pageview-session-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+ pageViews
+ .partitionBy(keyFn)
+ .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+ .sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
new file mode 100644
index 0000000..1bc6ff4
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
+ * window.
+ *
+ * <p> Concepts covered: Performing Group-By style aggregations on tumbling time windows.
+ *
+ * <p> Tumbling windows divide a stream into a set of contiguous, fixed-sized, non-overlapping time intervals.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-tumbling-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-tumbling-input" topic <br/>
+ ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
+ user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ * </li>
+ * </ol>
+ *
+ */
+public class TumblingPageViewCounterApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
+ private static final String INPUT_TOPIC = "pageview-tumbling-input";
+ private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+
+ pageViews
+ .partitionBy(keyFn)
+ .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+ .sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
new file mode 100644
index 0000000..c320209
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import samza.examples.wikipedia.model.WikipediaParser;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+
+/**
+ * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
+ * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask},
+ * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and
+ * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression.
+ *
+ * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits"
+ * streams to connect the operators, as they are not needed with the fluent API.
+ *
+ * The application processes Wikipedia events in the following steps:
+ * <ul>
+ * <li>Merge wikipedia, wiktionary, and wikinews events into one stream</li>
+ * <li>Parse each event to a more structured format</li>
+ * <li>Aggregate some stats over a 10s window</li>
+ * <li>Format each window output for public consumption</li>
+ * <li>Send the window output to Kafka</li>
+ * </ul>
+ *
+ * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which
+ * is invoked by the framework to load the application.
+ */
+public class WikipediaApplication implements StreamApplication {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class);
+
+ // Inputs
+ private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia";
+ private static final String WIKTIONARY_STREAM_ID = "en-wiktionary";
+ private static final String WIKINEWS_STREAM_ID = "en-wikinews";
+
+ // Outputs
+ private static final String STATS_STREAM_ID = "wikipedia-stats";
+
+ // Stores
+ private static final String STATS_STORE_NAME = "wikipedia-stats";
+
+ // Metrics
+ private static final String EDIT_COUNT_KEY = "count-edits-all-time";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ // Inputs
+ // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
+ // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
+ MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+
+ // Output (also un-keyed, so no keyExtractor)
+ OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+
+ // Merge inputs
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+
+ // Parse, update stats, prepare output, and send
+ allWikipediaEvents.map(WikipediaParser::parseEvent)
+ .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+ .map(this::formatOutput)
+ .sendTo(wikipediaStats);
+ }
+
+ /**
+ * A few statistics about the incoming messages.
+ */
+ private static class WikipediaStats {
+ // Windowed stats
+ int edits = 0;
+ int byteDiff = 0;
+ Set<String> titles = new HashSet<String>();
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ // Total stats
+ int totalEdits = 0;
+
+ @Override
+ public String toString() {
+ return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+ }
+ }
+
+ /**
+ * Updates the windowed and total stats based on each "edit" event.
+ *
+ * Uses a KeyValueStore to persist a total edit count across restarts.
+ */
+ private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> {
+
+ private KeyValueStore<String, Integer> store;
+
+ // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+ private Counter repeatEdits;
+
+ /**
+ * {@inheritDoc}
+ * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+ * get a KeyValueStore for persistence and the MetricsRegistry for metrics.
+ */
+ @Override
+ public void init(Config config, TaskContext context) {
+ store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME);
+ repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+ }
+
+ @Override
+ public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) {
+
+ // Update persisted total
+ Integer editsAllTime = store.get(EDIT_COUNT_KEY);
+ if (editsAllTime == null) editsAllTime = 0;
+ editsAllTime++;
+ store.put(EDIT_COUNT_KEY, editsAllTime);
+
+ // Update window stats
+ stats.edits++;
+ stats.totalEdits = editsAllTime;
+ stats.byteDiff += (Integer) edit.get("diff-bytes");
+ boolean newTitle = stats.titles.add((String) edit.get("title"));
+
+ Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+ for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+ if (Boolean.TRUE.equals(flag.getValue())) {
+ stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
+ }
+ }
+
+ if (!newTitle) {
+ repeatEdits.inc();
+ log.info("Frequent edits for title: {}", edit.get("title"));
+ }
+ return stats;
+ }
+ }
+
+ /**
+ * Format the stats for output to Kafka.
+ */
+ private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+
+ WikipediaStats stats = statsWindowPane.getMessage();
+
+ Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
+ counts.put("edits", stats.edits);
+ counts.put("edits-all-time", stats.totalEdits);
+ counts.put("bytes-added", stats.byteDiff);
+ counts.put("unique-titles", stats.titles.size());
+
+ return counts;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
new file mode 100644
index 0000000..51dd28f
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper.
+ * It waits for the job to finish; The job can also be ended by killing this process.
+ */
+public class WikipediaZkLocalApplication {
+
+ /**
+ * Executes the application using the local application runner.
+ * It takes two required command line arguments
+ * config-factory: a fully {@link org.apache.samza.config.factories.PropertiesConfigFactory} class name
+ * config-path: path to application properties
+ *
+ * @param args command line arguments
+ */
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ WikipediaApplication app = new WikipediaApplication();
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+}