You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:25:42 UTC
[01/33] samza-hello-samza git commit: SAMZA-604: Provide build.gradle
script for hello-samza project
Repository: samza-hello-samza
Updated Branches:
refs/heads/master 90bbe75f2 -> f48892747
SAMZA-604: Provide build.gradle script for hello-samza project
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/3be8dbd2
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/3be8dbd2
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/3be8dbd2
Branch: refs/heads/master
Commit: 3be8dbd2f5996a5ce6c31fb8422715e56488ac2e
Parents: 9593371
Author: Ken Gidley <kg...@yahoo.com>
Authored: Fri Apr 24 16:45:46 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Dec 21 16:59:44 2015 -0800
----------------------------------------------------------------------
.gitignore | 2 +
README-gradle.txt | 51 +++++++
build.gradle | 207 ++++++++++++++++++++++++++
gradle.properties | 25 ++++
gradle/wrapper/gradle-wrapper.jar | Bin 0 -> 52141 bytes
gradle/wrapper/gradle-wrapper.properties | 6 +
gradlew | 164 ++++++++++++++++++++
gradlew.bat | 90 +++++++++++
pom.xml | 4 +
9 files changed, 549 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0435c14..849ce6a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ target/
*/.cache
deploy
*.swp
+build/
+.gradle/
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/README-gradle.txt
----------------------------------------------------------------------
diff --git a/README-gradle.txt b/README-gradle.txt
new file mode 100644
index 0000000..a564427
--- /dev/null
+++ b/README-gradle.txt
@@ -0,0 +1,51 @@
+
+To use gradle to build/run the hello-samza project:
+
+1) the project is configured to download and use gradle version 2.3 - on first task execution, it will download the required gradle jars.
+
+2) download/install yarn/kafka/zookeeper:
+
+ $ ./gradlew installGrid
+
+3) build hello-samza job package:
+
+ $ ./gradlew distTar
+
+4) deploy hello-samza project to grid:
+
+ $ ./gradlew deployHelloSamza
+
+5) start the grid (starts up yarn/kafka/zookeeper):
+
+ $ ./gradlew startGrid
+
+6) run the various Samza tasks that are part of hello-samza project:
+
+ $ ./gradlew runWikiFeed
+ $ ./gradlew runWikiParser
+ $ ./gradlew runWikiStats
+
+7) view all the current Kafka topics:
+
+ $ ./gradlew listKafkaTopics
+
+8) view the Kafka topics output by the various Samza tasks:
+
+ $ ./gradlew dumpWikiRaw
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+ $ ./gradlew dumpWikiEdits
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+ $ ./gradlew dumpWikiStats
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+9) stop all the components:
+
+ $ ./gradlew stopGrid
+
+Shortcut: using the 'runWiki*' tasks directly will do steps 3-6 automatically.
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
new file mode 100644
index 0000000..cbbd2b3
--- /dev/null
+++ b/build.gradle
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+apply plugin: 'eclipse'
+apply plugin: 'java'
+
+defaultTasks 'distTar'
+
+task wrapper(type: Wrapper) {
+ description = "Updates gradlew and supporting files."
+ gradleVersion = '2.3'
+}
+
+version = "$SAMZA_VERSION"
+
+repositories {
+ mavenCentral()
+ maven { url "https://repository.apache.org/content/groups/public" }
+}
+
+// a configuration for dependencies that need exploding into package
+configurations {
+ explode
+}
+
+dependencies {
+ compile(group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.8.5')
+ compile(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION")
+ compile(group: 'org.slf4j', name: 'slf4j-log4j12', version: "$SLF4J_VERSION")
+ compile(group: 'org.schwering', name: 'irclib', version: '1.10')
+ compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
+ compile(group: 'org.apache.samza', name: 'samza-kv_2.10', version: "$SAMZA_VERSION")
+
+ explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
+
+ runtime(group: 'org.apache.samza', name: 'samza-core_2.10', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-yarn_2.10', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.10', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-kafka_2.10', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.kafka', name: 'kafka_2.10', version: "$KAFKA_VERSION")
+ runtime(group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: "$HADOOP_VERSION")
+}
+
+// make the samza distribution .tgz file
+task distTar(dependsOn: build, type: Tar) {
+ destinationDir(new File(project.buildDir, "/distributions"))
+ compression(Compression.GZIP)
+ classifier('dist')
+ extension('tar.gz')
+ into("config") {
+ from("src/main/config") {
+ include "wikipedia-feed.properties"
+ include "wikipedia-parser.properties"
+ include "wikipedia-stats.properties"
+
+ // expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle)
+ filter { String line ->
+ line.replaceAll('[\$][{]basedir[}]', project.projectDir.toString()).replaceAll('[\$][{]project.artifactId[}]', project.name.toString()).replaceAll('/target/', '/build/distributions/').replaceAll('[\$][{]pom.version[}]', version)
+ }
+ }
+ }
+
+ into("bin") {
+ from {
+ configurations.explode.collect { tarTree(it) }
+ }
+ }
+
+ into("lib") {
+ from configurations.runtime
+ from configurations.runtime.artifacts.files
+ from("src/main/resources/") {
+ include "log4j.xml"
+ }
+ }
+}
+
+// install everything
+task installGrid(type: Exec) {
+ workingDir(project.projectDir)
+ commandLine("bin/grid", "install", "all")
+ outputs.upToDateWhen {
+ ["kafka", "zookeeper", "yarn"].every {
+ (new File(project.projectDir, "deploy/" + it)).exists()
+ }
+ }
+}
+
+// update the Samza job
+task deployHelloSamza(dependsOn: [distTar, installGrid], type: Sync) {
+ into(new File(project.projectDir, "/deploy/samza"))
+ from(tarTree(distTar.archivePath))
+}
+
+
+// run everything
+task startGrid(type: Exec) {
+ workingDir(project.projectDir)
+ commandLine("bin/grid", "start", "all")
+ outputs.upToDateWhen {
+ // use running zookeeper as proxy
+ File zookeeperPidFile = new File("/tmp/zookeeper/zookeeper_server.pid")
+ zookeeperPidFile.exists() &&
+ "kill -0 ${zookeeperPidFile.text}".execute().waitFor() == 0
+ }
+}
+
+// stop everything
+task stopGrid(type: Exec) {
+ workingDir(project.projectDir)
+ commandLine("bin/grid", "stop", "all")
+}
+
+//
+// Samza helpers
+//
+
+// helper task to run Samza jobs
+class SamzaTask extends DefaultTask {
+ String configFile;
+
+ @TaskAction
+ def startSamza() {
+ project.exec {
+ workingDir(project.projectDir)
+ commandLine("deploy/samza/bin/run-job.sh",
+ "--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path=file://${project.projectDir}/deploy/samza/config/${configFile}")
+ }
+ }
+}
+
+// helper to run the Samza job to retrieve Wikipedia data
+task runWikiFeed(dependsOn: [startGrid, deployHelloSamza], type: SamzaTask) {
+ configFile("wikipedia-feed.properties")
+}
+
+// helper to run the Samza job process Wikipedia data
+task runWikiParser(dependsOn: [startGrid, deployHelloSamza], type: SamzaTask) {
+ configFile("wikipedia-parser.properties")
+}
+
+// helper to run the Samza job to summarize stats on Wikipedia edits
+task runWikiStats(dependsOn: [startGrid, deployHelloSamza], type: SamzaTask) {
+ configFile("wikipedia-stats.properties")
+}
+
+
+//
+// Kafka helpers
+//
+
+// show all Kafka topics
+task listKafkaTopics(type: Exec) {
+ workingDir(project.projectDir)
+ commandLine("deploy/kafka/bin/kafka-topics.sh",
+ "--zookeeper", "localhost:2181",
+ "--list")
+}
+
+// helper task to monitor a Kafka topic
+class KafkaDumpTask extends DefaultTask {
+ String topic;
+
+ @TaskAction
+ def dumpTopic() {
+ project.exec {
+ workingDir(project.projectDir)
+ commandLine("deploy/kafka/bin/kafka-console-consumer.sh",
+ "--zookeeper", "localhost:2181",
+ "--topic", "${topic}")
+ }
+ }
+}
+
+// helper to dump the wikipedia-raw topic
+task dumpWikiRaw(dependsOn: startGrid, type: KafkaDumpTask) {
+ topic("wikipedia-raw")
+}
+
+// helper to dump the wikipedia-edits topic
+task dumpWikiEdits(dependsOn: startGrid, type: KafkaDumpTask) {
+ topic("wikipedia-edits")
+}
+
+// helper to dump the wikipedia-stats topic
+task dumpWikiStats(dependsOn: startGrid, type: KafkaDumpTask) {
+ topic("wikipedia-stats")
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..83b150f
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+SAMZA_VERSION=0.9.0
+KAFKA_VERSION=0.8.2.1
+HADOOP_VERSION=2.4.0
+
+SLF4J_VERSION = 1.7.7
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..085a1cd
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..3584fa0
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Mon Mar 23 14:55:28 PDT 2015
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.3-bin.zip
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
new file mode 100755
index 0000000..91a7e26
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,164 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+## Gradle start up script for UN*X
+##
+##############################################################################
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+ echo "$*"
+}
+
+die ( ) {
+ echo
+ echo "$*"
+ echo
+ exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+case "`uname`" in
+ CYGWIN* )
+ cygwin=true
+ ;;
+ Darwin* )
+ darwin=true
+ ;;
+ MINGW* )
+ msys=true
+ ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched.
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >&-
+APP_HOME="`pwd -P`"
+cd "$SAVED" >&-
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD="java"
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+ MAX_FD_LIMIT=`ulimit -H -n`
+ if [ $? -eq 0 ] ; then
+ if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+ MAX_FD="$MAX_FD_LIMIT"
+ fi
+ ulimit -n $MAX_FD
+ if [ $? -ne 0 ] ; then
+ warn "Could not set maximum file descriptor limit: $MAX_FD"
+ fi
+ else
+ warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+ fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+ APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+ CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
+ # We build the pattern for arguments to be converted via cygpath
+ ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+ SEP=""
+ for dir in $ROOTDIRSRAW ; do
+ ROOTDIRS="$ROOTDIRS$SEP$dir"
+ SEP="|"
+ done
+ OURCYGPATTERN="(^($ROOTDIRS))"
+ # Add a user-defined pattern to the cygpath arguments
+ if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+ OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+ fi
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ i=0
+ for arg in "$@" ; do
+ CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+ CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
+
+ if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
+ eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+ else
+ eval `echo args$i`="\"$arg\""
+ fi
+ i=$((i+1))
+ done
+ case $i in
+ (0) set -- ;;
+ (1) set -- "$args0" ;;
+ (2) set -- "$args0" "$args1" ;;
+ (3) set -- "$args0" "$args1" "$args2" ;;
+ (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+ (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+ (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+ (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+ (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+ (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+ esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+ JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/gradlew.bat
----------------------------------------------------------------------
diff --git a/gradlew.bat b/gradlew.bat
new file mode 100644
index 0000000..aec9973
--- /dev/null
+++ b/gradlew.bat
@@ -0,0 +1,90 @@
+@if "%DEBUG%" == "" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
+
+set DIRNAME=%~dp0
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto init
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto init
+
+echo.
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:init
+@rem Get command-line arguments, handling Windowz variants
+
+if not "%OS%" == "Windows_NT" goto win9xME_args
+if "%@eval[2+2]" == "4" goto 4NT_args
+
+:win9xME_args
+@rem Slurp the command line arguments.
+set CMD_LINE_ARGS=
+set _SKIP=2
+
+:win9xME_args_slurp
+if "x%~1" == "x" goto execute
+
+set CMD_LINE_ARGS=%*
+goto execute
+
+:4NT_args
+@rem Get arguments from the 4NT Shell from JP Software
+set CMD_LINE_ARGS=%$
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
+
+:end
+@rem End local scope for the variables with windows NT shell
+if "%ERRORLEVEL%"=="0" goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3be8dbd2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c1d552f..f8cd7f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -189,6 +189,10 @@ under the License.
<exclude>**/.cache/**</exclude>
<exclude>deploy/**</exclude>
<exclude>**/.project</exclude>
+ <exclude>**/.gradle/**</exclude>
+ <exclude>**/gradle/**</exclude>
+ <exclude>**/gradlew*</exclude>
+ <exclude>**/build/**</exclude>
</excludes>
</configuration>
</plugin>
[04/33] samza-hello-samza git commit: SAMZA-718: updated the links in
README.md
Posted by xi...@apache.org.
SAMZA-718: updated the links in README.md
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c1c153f7
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c1c153f7
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c1c153f7
Branch: refs/heads/master
Commit: c1c153f7cc39b9f5628973c60f2231d044bd2293
Parents: c0ef56c
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Sat Jun 20 23:17:59 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Dec 21 23:01:22 2015 -0800
----------------------------------------------------------------------
README.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c1c153f7/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 0a71cf6..4390006 100644
--- a/README.md
+++ b/README.md
@@ -7,8 +7,8 @@ Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) to ge
### Pull requests and questions
-[Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
+[Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
### Contribution
-To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) git repository does not support git pull request.
\ No newline at end of file
+To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) git repository does not support git pull request.
[33/33] samza-hello-samza git commit: Merge branch 'latest'
Posted by xi...@apache.org.
Merge branch 'latest'
Conflicts:
README.md
bin/deploy.sh
bin/grid
build.gradle
gradle.properties
pom.xml
src/main/assembly/src.xml
src/main/config/pageview-adclick-joiner.properties
src/main/config/pageview-filter.properties
src/main/config/pageview-sessionizer.properties
src/main/config/tumbling-pageview-counter.properties
src/main/config/wikipedia-application-local-runner.properties
src/main/config/wikipedia-application.properties
src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
src/main/java/samza/examples/cookbook/PageViewFilterApp.java
src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f4889274
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f4889274
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f4889274
Branch: refs/heads/master
Commit: f48892747dea196ab80eb0c76081efa86f0ba00c
Parents: 90bbe75 4e1aea3
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Mon Jan 8 17:24:45 2018 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Jan 8 17:24:45 2018 -0800
----------------------------------------------------------------------
bin/deploy.sh | 2 +-
bin/run-azure-application.sh | 30 ++++
gradle.properties | 2 +-
pom.xml | 9 +-
.../azure-application-local-runner.properties | 49 +++++++
.../config/pageview-adclick-joiner.properties | 18 +--
src/main/config/pageview-filter.properties | 18 +--
src/main/config/pageview-sessionizer.properties | 18 +--
.../config/tumbling-pageview-counter.properties | 20 +--
...ikipedia-application-local-runner.properties | 2 -
.../config/wikipedia-application.properties | 4 +-
.../samza/examples/azure/AzureApplication.java | 61 ++++++++
.../examples/azure/AzureZKLocalApplication.java | 42 ++++++
.../cookbook/PageViewAdClickJoiner.java | 108 ++++++++------
.../examples/cookbook/PageViewFilterApp.java | 47 +++---
.../cookbook/PageViewSessionizerApp.java | 59 +++++---
.../cookbook/TumblingPageViewCounterApp.java | 54 ++++---
.../samza/examples/cookbook/data/AdClick.java | 54 +++++++
.../samza/examples/cookbook/data/PageView.java | 46 ++++++
.../samza/examples/cookbook/data/Profile.java | 42 ++++++
.../examples/cookbook/data/UserPageViews.java | 51 +++++++
.../application/WikipediaApplication.java | 145 +++++++++++++------
22 files changed, 658 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f4889274/bin/deploy.sh
----------------------------------------------------------------------
diff --cc bin/deploy.sh
index 14c3a5e,51faed1..9526067
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@@ -23,4 -23,4 +23,4 @@@ base_dir=`pwd
mvn clean package
mkdir -p $base_dir/deploy/samza
- tar -xvf $base_dir/target/hello-samza-0.13.1-dist.tar.gz -C $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-0.13.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
++tar -xvf $base_dir/target/hello-samza-0.14.0-dist.tar.gz -C $base_dir/deploy/samza
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f4889274/gradle.properties
----------------------------------------------------------------------
diff --cc gradle.properties
index 8b71e61,f14b8f7..e207baa
--- a/gradle.properties
+++ b/gradle.properties
@@@ -17,7 -17,7 +17,7 @@@
* under the License.
*/
- SAMZA_VERSION=0.13.0
-SAMZA_VERSION=0.14.1-SNAPSHOT
++SAMZA_VERSION=0.14.0
KAFKA_VERSION=0.10.1.1
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f4889274/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index a933038,dbfe749..2065d7a
--- a/pom.xml
+++ b/pom.xml
@@@ -27,7 -27,7 +27,7 @@@ under the License
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.13.0</version>
- <version>0.14.1-SNAPSHOT</version>
++ <version>0.14.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@@ -148,7 -148,7 +153,7 @@@
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.13.0</samza.version>
- <samza.version>0.14.1-SNAPSHOT</samza.version>
++ <samza.version>0.14.0</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
[16/33] samza-hello-samza git commit: SAMZA-1225: Add demo examples
for programming with the fluent API
Posted by xi...@apache.org.
SAMZA-1225: Add demo examples for programming with the fluent API
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c87ed565
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c87ed565
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c87ed565
Branch: refs/heads/master
Commit: c87ed565fbaebf2ac88376143c65e9f52f7a8801
Parents: 4d20c2b
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Wed Apr 19 16:52:22 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Thu Apr 20 18:09:50 2017 -0700
----------------------------------------------------------------------
bin/grid | 4 +-
gradle.properties | 2 +-
pom.xml | 6 +-
src/main/assembly/src.xml | 20 ++++
.../config/pageview-adclick-joiner.properties | 46 ++++++++
src/main/config/pageview-filter.properties | 46 ++++++++
src/main/config/pageview-sessionizer.properties | 46 ++++++++
.../config/tumbling-pageview-counter.properties | 46 ++++++++
.../java/samza/examples/cookbook/AdClick.java | 58 ++++++++++
.../java/samza/examples/cookbook/PageView.java | 61 ++++++++++
.../cookbook/PageViewAdClickJoiner.java | 115 +++++++++++++++++++
.../examples/cookbook/PageViewFilterApp.java | 86 ++++++++++++++
.../cookbook/PageViewSessionizerApp.java | 87 ++++++++++++++
.../cookbook/TumblingPageViewCounterApp.java | 90 +++++++++++++++
14 files changed, 707 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index ec9d210..7d2112b 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
@@ -96,7 +96,7 @@ install_yarn() {
install_kafka() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1
+ install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
# have to use SIGTERM since nohup on appears to ignore SIGINT
# and Kafka switched to SIGINT in KAFKA-1031.
sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 1bc7633..294875b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,7 +18,7 @@
*/
SAMZA_VERSION=0.13.0-SNAPSHOT
-KAFKA_VERSION=0.10.0.1
+KAFKA_VERSION=0.10.1.1
HADOOP_VERSION=2.6.1
SLF4J_VERSION = 1.7.7
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 100b2b1..9a0b54e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@ under the License.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
- <version>0.10.0.1</version>
+ <version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
@@ -240,8 +240,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 3f2e4a8..e280a9a 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -51,6 +51,26 @@
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
+ <file>
+ <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>${basedir}/src/main/config/pageview-sessionizer.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>${basedir}/src/main/config/pageview-filter.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
</files>
<dependencySets>
<dependencySet>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
new file mode 100644
index 0000000..81ec3f6
--- /dev/null
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-adclick-joiner
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewAdClickJoiner
+task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
new file mode 100644
index 0000000..b9e8d2a
--- /dev/null
+++ b/src/main/config/pageview-filter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterApp
+task.inputs=kafka.pageview-filter-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
new file mode 100644
index 0000000..847aa87
--- /dev/null
+++ b/src/main/config/pageview-sessionizer.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-sessionizer
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewSessionizerApp
+task.inputs=kafka.pageview-session-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
new file mode 100644
index 0000000..09fb131
--- /dev/null
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=tumbling-pageview-counter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.TumblingPageViewCounterApp
+task.inputs=kafka.pageview-tumbling-input
+task.window.ms=2000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=string
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=2
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+ /*
+ * An unique identifier for the ad
+ */
+ private final String adId;
+ /**
+ * The user that clicked the ad
+ */
+ private final String userId;
+ /**
+ * The id of the page that the ad was served from
+ */
+ private final String pageId;
+
+ public AdClick(String message) {
+ String[] adClickFields = message.split(",");
+ this.adId = adClickFields[0];
+ this.userId = adClickFields[1];
+ this.pageId = adClickFields[2];
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getPageId() {
+ return pageId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+ /**
+ * The user that viewed the page
+ */
+ private final String userId;
+ /**
+ * The region that the page was viewed from
+ */
+ private final String country;
+ /**
+ * A trackingId for the page
+ */
+ private final String pageId;
+
+ /**
+ * Constructs a {@link PageView} from the provided string.
+ *
+ * @param message in the following CSV format - userId,country,url
+ */
+ PageView(String message) {
+ String[] pageViewFields = message.split(",");
+ userId = pageViewFields[0];
+ country = pageViewFields[1];
+ pageId = pageViewFields[2];
+ }
+
+ String getUserId() {
+ return userId;
+ }
+
+ String getCountry() {
+ return country;
+ }
+
+ String getPageId() {
+ return pageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
new file mode 100644
index 0000000..94c7bc3
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+/**
+ * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
+ * analysis on what pages served an Ad that was clicked.
+ *
+ * <p> Concepts covered: Performing stream to stream Joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-join-input" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
+ * adClickId1,user1,google.com <br/>
+ * adClickId2,user1,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
+ * --property print.key=true
+ * </li>
+ * </ol>
+ *
+ */
+public class PageViewAdClickJoiner implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
+ private static final String INPUT_TOPIC1 = "pageview-join-input";
+ private static final String INPUT_TOPIC2 = "adclick-join-input";
+
+ private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
+ MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+
+ OutputStream<String, String, String> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+
+ Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
+ Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+
+ MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
+ MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+
+ pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+
+ @Override
+ public String apply(String pageViewMsg, String adClickMsg) {
+ PageView pageView = new PageView(pageViewMsg);
+ AdClick adClick = new AdClick(adClickMsg);
+ String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
+ return joinResult;
+ }
+
+ @Override
+ public String getFirstKey(String msg) {
+ return new PageView(msg).getPageId();
+ }
+
+ @Override
+ public String getSecondKey(String msg) {
+ return new AdClick(msg).getPageId();
+ }
+ }, Duration.ofMinutes(3)).sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
new file mode 100644
index 0000000..cb39553
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+/**
+ * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
+ *
+ * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-filter-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-filter-input" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
+ * --property print.key=true </li>
+ * </ol>
+ *
+ */
+public class PageViewFilterApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
+ private static final String FILTER_KEY = "badKey";
+ private static final String INPUT_TOPIC = "pageview-filter-input";
+ private static final String OUTPUT_TOPIC = "pageview-filter-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+ OutputStream<String, String, String> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
+
+ FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+
+ pageViews
+ .partitionBy(keyFn)
+ .filter(filterFn)
+ .sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
new file mode 100644
index 0000000..7ec4f9d
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group page views by userId into sessions, and compute the number of page views for each user
+ * session. A session is considered closed when there is no user activity for a 3 second duration.
+ *
+ * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-session-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-session-input" topic <br/>
+ * user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
+ * --property print.key=true
+ * </li>
+ * </ol>
+ *
+ */
+public class PageViewSessionizerApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
+ private static final String INPUT_TOPIC = "pageview-session-input";
+ private static final String OUTPUT_TOPIC = "pageview-session-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+
+ pageViews
+ .partitionBy(keyFn)
+ .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+ .sendTo(outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
new file mode 100644
index 0000000..1bc6ff4
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
+ * window.
+ *
+ * <p> Concepts covered: Performing Group-By style aggregations on tumbling time windows.
+ *
+ * <p> Tumbling windows divide a stream into a set of contiguous, fixed-sized, non-overlapping time intervals.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "pageview-tumbling-input" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-tumbling-input" topic <br/>
+ ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
+ user1,india,google.com <br/>
+ * user2,china,yahoo.com
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ * </li>
+ * </ol>
+ *
+ */
+public class TumblingPageViewCounterApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
+ private static final String INPUT_TOPIC = "pageview-tumbling-input";
+ private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
+
+ OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
+ .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
+
+ Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+
+ pageViews
+ .partitionBy(keyFn)
+ .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+ .sendTo(outputStream);
+ }
+}
[32/33] samza-hello-samza git commit: SAMZA-1550: Update samza
version to 0.14.1-SNAPSHOT
Posted by xi...@apache.org.
SAMZA-1550: Update samza version to 0.14.1-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/4e1aea38
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/4e1aea38
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/4e1aea38
Branch: refs/heads/master
Commit: 4e1aea380872f4c1bf09ca7193109eb6d4091800
Parents: befe934
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Mon Jan 8 17:05:25 2018 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Jan 8 17:05:25 2018 -0800
----------------------------------------------------------------------
gradle.properties | 2 +-
pom.xml | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4e1aea38/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 800e15b..f14b8f7 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=0.13.1-SNAPSHOT
+SAMZA_VERSION=0.14.1-SNAPSHOT
KAFKA_VERSION=0.10.1.1
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4e1aea38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b2eb55..dbfe749 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.13.1-SNAPSHOT</version>
+ <version>0.14.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -148,7 +148,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.13.1-SNAPSHOT</samza.version>
+ <samza.version>0.14.1-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
[12/33] samza-hello-samza git commit: Updating samza version to
0.11.0-SNAPSHOT
Posted by xi...@apache.org.
Updating samza version to 0.11.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/b240b65f
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/b240b65f
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/b240b65f
Branch: refs/heads/master
Commit: b240b65fd11a9ae5d52703e819d4c5015053d1e3
Parents: 3124494
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Mon Aug 8 14:09:47 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Mon Aug 8 14:09:47 2016 -0700
----------------------------------------------------------------------
gradle.properties | 2 +-
pom.xml | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b240b65f/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index b7729bd..fe71c66 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=0.10.1-SNAPSHOT
+SAMZA_VERSION=0.11.0-SNAPSHOT
KAFKA_VERSION=0.8.2.1
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b240b65f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9745c93..9cc44de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.10.1</version>
+ <version>0.11.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -143,7 +143,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.10.1-SNAPSHOT</samza.version>
+ <samza.version>0.11.0-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
[31/33] samza-hello-samza git commit: Added stream-table join sample
to cookbook
Posted by xi...@apache.org.
Added stream-table join sample to cookbook
As per subject, it's a demo of stream-to-table join using RocksDb.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #31 from weisong44/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/befe934c
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/befe934c
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/befe934c
Branch: refs/heads/master
Commit: befe934c4ad9ba1664daa7a21c9574a52e0c5542
Parents: cb43653
Author: Wei Song <ws...@linkedin.com>
Authored: Mon Dec 18 16:11:20 2017 -0800
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Mon Dec 18 16:11:20 2017 -0800
----------------------------------------------------------------------
.../pageview-profile-table-joiner.properties | 35 +++++
.../cookbook/PageViewProfileTableJoiner.java | 132 +++++++++++++++++++
.../samza/examples/cookbook/data/Profile.java | 42 ++++++
3 files changed, 209 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/befe934c/src/main/config/pageview-profile-table-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-profile-table-joiner.properties b/src/main/config/pageview-profile-table-joiner.properties
new file mode 100644
index 0000000..7cec601
--- /dev/null
+++ b/src/main/config/pageview-profile-table-joiner.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-profile-table-joiner
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewProfileTableJoiner
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/befe934c/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java
new file mode 100644
index 0000000..86deb61
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.table.Table;
+
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.Profile;
+
+/**
+ * In this example, we join a stream of Page views with a table of user profiles, which is populated from an
+ * user profile stream. For instance, this is helpful for analysis that required additional information from
+ * user's profile.
+ *
+ * <p> Concepts covered: Performing stream-to-table joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topics "pageview-join-input", "profile-table-input" are created <br/>
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic profile-table-input --partitions 2 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-profile-table-joiner.properties
+ * </li>
+ * <li>
+ * Produce some messages to the "profile-table-input" topic with the same userId <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic profile-table-input --broker-list localhost:9092 <br/>
+ * {"userId": "user1", "company": "LNKD"} <br/>
+ * {"userId": "user2", "company": "MSFT"}
+ * </li>
+ * <li>
+ * Produce some messages to the "pageview-join-input" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
+ * </li>
+ * <li>
+ * Consume messages from the "enriched-pageview-join-output" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched-pageview-join-output
+ * </li>
+ * </ol>
+ *
+ */
+public class PageViewProfileTableJoiner implements StreamApplication {
+
+ private static final String PROFILE_TOPIC = "profile-table-input";
+ private static final String PAGEVIEW_TOPIC = "pageview-join-input";
+ private static final String OUTPUT_TOPIC = "enriched-pageview-join-output";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ Serde<Profile> profileSerde = new JsonSerdeV2<>(Profile.class);
+ Serde<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class);
+
+ OutputStream<EnrichedPageView> joinResultStream = graph.getOutputStream(
+ OUTPUT_TOPIC, new JsonSerdeV2<>(EnrichedPageView.class));
+
+ Table profileTable = graph.getTable(new RocksDbTableDescriptor<String, Profile>("profile-table")
+ .withSerde(KVSerde.of(new StringSerde(), profileSerde)));
+
+ graph.getInputStream(PROFILE_TOPIC, profileSerde)
+ .map(profile -> KV.of(profile.userId, profile))
+ .sendTo(profileTable);
+
+ graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde)
+ .partitionBy(pv -> pv.userId, pv -> pv, new KVSerde(new StringSerde(), pageViewSerde), "join")
+ .join(profileTable, new JoinFn())
+ .sendTo(joinResultStream);
+ }
+
+ private class JoinFn implements StreamTableJoinFunction<String, KV<String, PageView>, KV<String, Profile>, EnrichedPageView> {
+ @Override
+ public EnrichedPageView apply(KV<String, PageView> message, KV<String, Profile> record) {
+ return record == null ? null :
+ new EnrichedPageView(message.getKey(), record.getValue().company, message.getValue().pageId);
+ }
+ @Override
+ public String getMessageKey(KV<String, PageView> message) {
+ return message.getKey();
+ }
+ @Override
+ public String getRecordKey(KV<String, Profile> record) {
+ return record.getKey();
+ }
+ }
+
+ static public class EnrichedPageView {
+
+ public final String userId;
+ public final String company;
+ public final String pageId;
+
+ public EnrichedPageView(String userId, String company, String pageId) {
+ this.userId = userId;
+ this.company = company;
+ this.pageId = pageId;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/befe934c/src/main/java/samza/examples/cookbook/data/Profile.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/Profile.java b/src/main/java/samza/examples/cookbook/data/Profile.java
new file mode 100644
index 0000000..87706fd
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/Profile.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+public class Profile {
+
+ public final String userId;
+ public final String company;
+
+ /**
+ * Constructs a user profile.
+ *
+ * @param userId the user Id
+ * @param company company to which the user belong to
+ */
+ public Profile(
+ @JsonProperty("userId") String userId,
+ @JsonProperty("company") String company) {
+ this.userId = userId;
+ this.company = company;
+ }
+
+}
[22/33] samza-hello-samza git commit: SAMZA-1315: Rename startup
script and fix configurations for wikipedia-zk application
Posted by xi...@apache.org.
SAMZA-1315: Rename startup script and fix configurations for wikipedia-zk application
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #17 from bharathkk/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/132a17f6
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/132a17f6
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/132a17f6
Branch: refs/heads/master
Commit: 132a17f667b4fecad1abe965a7c86ccffff38c8f
Parents: 35e017c
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Tue May 30 10:01:22 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Tue May 30 10:01:22 2017 -0700
----------------------------------------------------------------------
bin/run-wikipedia-standalone-application.sh | 30 --------------------
bin/run-wikipedia-zk-application.sh | 30 ++++++++++++++++++++
src/main/assembly/src.xml | 2 +-
...ikipedia-application-local-runner.properties | 2 +-
4 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/132a17f6/bin/run-wikipedia-standalone-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh
deleted file mode 100755
index 6feea52..0000000
--- a/bin/run-wikipedia-standalone-application.sh
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-home_dir=`pwd`
-base_dir=$(dirname $0)/..
-cd $base_dir
-base_dir=`pwd`
-cd $home_dir
-
-export EXECUTION_PLAN_DIR="$base_dir/plan"
-mkdir -p $EXECUTION_PLAN_DIR
-
-[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-
-exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/132a17f6/bin/run-wikipedia-zk-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-zk-application.sh b/bin/run-wikipedia-zk-application.sh
new file mode 100755
index 0000000..6feea52
--- /dev/null
+++ b/bin/run-wikipedia-zk-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/132a17f6/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 7832af5..c04ace0 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -45,7 +45,7 @@
<outputDirectory>lib</outputDirectory>
</file>
<file>
- <source>${basedir}/bin/run-wikipedia-standalone-application.sh</source>
+ <source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
</files>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/132a17f6/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
index 32b0290..1911e68 100644
--- a/src/main/config/wikipedia-application-local-runner.properties
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -19,7 +19,7 @@
job.name=wikipedia-application
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.default.system=kafka
-coordinator.zk.connect=localhost:2181
+job.coordinator.zk.connect=localhost:2181
# Task/Application
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
[15/33] samza-hello-samza git commit: SAMZA-1099: Changes to
hello-samza (latest branch) for the 0.12 release
Posted by xi...@apache.org.
SAMZA-1099: Changes to hello-samza (latest branch) for the 0.12 release
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/4d20c2bd
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/4d20c2bd
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/4d20c2bd
Branch: refs/heads/master
Commit: 4d20c2bdee7a88a8ff72e3cf56f2f19060eb57e4
Parents: e02c956
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Wed Feb 22 10:46:03 2017 -0800
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed Feb 22 10:46:03 2017 -0800
----------------------------------------------------------------------
bin/grid | 6 +++---
build.gradle | 12 ++++++------
gradle.properties | 4 ++--
pom.xml | 18 +++++++++---------
src/main/assembly/src.xml | 10 +++++-----
5 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4d20c2bd/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 74ee026..ec9d210 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
@@ -74,7 +74,7 @@ install_samza() {
git clone git://git.apache.org/samza.git
cd samza
fi
- ./gradlew -PscalaVersion=2.10 clean publishToMavenLocal
+ ./gradlew -PscalaVersion=2.11 clean publishToMavenLocal
popd
}
@@ -96,7 +96,7 @@ install_yarn() {
install_kafka() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install kafka $DOWNLOAD_KAFKA kafka_2.10-0.8.2.1
+ install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1
# have to use SIGTERM since nohup on appears to ignore SIGINT
# and Kafka switched to SIGINT in KAFKA-1031.
sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4d20c2bd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e838b18..40505ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -46,17 +46,17 @@ dependencies {
compile(group: 'org.slf4j', name: 'slf4j-log4j12', version: "$SLF4J_VERSION")
compile(group: 'org.schwering', name: 'irclib', version: '1.10')
compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
- compile(group: 'org.apache.samza', name: 'samza-kv_2.10', version: "$SAMZA_VERSION")
+ compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
- runtime(group: 'org.apache.samza', name: 'samza-core_2.10', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
- runtime(group: 'org.apache.samza', name: 'samza-yarn_2.10', version: "$SAMZA_VERSION")
- runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.10', version: "$SAMZA_VERSION")
- runtime(group: 'org.apache.samza', name: 'samza-kafka_2.10', version: "$SAMZA_VERSION")
- runtime(group: 'org.apache.kafka', name: 'kafka_2.10', version: "$KAFKA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
+ runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION")
runtime(group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: "$HADOOP_VERSION")
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4d20c2bd/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 3d8a0ea..1bc7633 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,8 +17,8 @@
* under the License.
*/
-SAMZA_VERSION=0.11.1-SNAPSHOT
-KAFKA_VERSION=0.8.2.1
+SAMZA_VERSION=0.13.0-SNAPSHOT
+KAFKA_VERSION=0.10.0.1
HADOOP_VERSION=2.6.1
SLF4J_VERSION = 1.7.7
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4d20c2bd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d90b1f9..100b2b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.11.1</version>
+ <version>0.13.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -43,7 +43,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-core_2.10</artifactId>
+ <artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
@@ -60,28 +60,28 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-yarn_2.10</artifactId>
+ <artifactId>samza-yarn_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-kv_2.10</artifactId>
+ <artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-kv-rocksdb_2.10</artifactId>
+ <artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-kafka_2.10</artifactId>
+ <artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
@@ -143,7 +143,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.11.1-SNAPSHOT</samza.version>
+ <samza.version>0.13.0-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/4d20c2bd/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index f57fee2..3f2e4a8 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -64,14 +64,14 @@
<dependencySet>
<outputDirectory>lib</outputDirectory>
<includes>
- <include>org.apache.samza:samza-core_2.10</include>
- <include>org.apache.samza:samza-kafka_2.10</include>
- <include>org.apache.samza:samza-yarn_2.10</include>
- <include>org.apache.samza:samza-kv-rocksdb_2.10</include>
+ <include>org.apache.samza:samza-core_2.11</include>
+ <include>org.apache.samza:samza-kafka_2.11</include>
+ <include>org.apache.samza:samza-yarn_2.11</include>
+ <include>org.apache.samza:samza-kv-rocksdb_2.11</include>
<include>org.apache.samza:samza-log4j</include>
<include>org.apache.samza:hello-samza</include>
<include>org.slf4j:slf4j-log4j12</include>
- <include>org.apache.kafka:kafka_2.10</include>
+ <include>org.apache.kafka:kafka_2.11</include>
<include>org.apache.hadoop:hadoop-hdfs</include>
</includes>
<useTransitiveFiltering>true</useTransitiveFiltering>
[30/33] samza-hello-samza git commit: Fix the Kafka download link
Posted by xi...@apache.org.
Fix the Kafka download link
The link http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz does not exist. The repo http://us.apache.org/dist/kafka/ has only versions 0.10.2.1, 0.11.0.2, and 1.0.0. Changed the repo to the main apache repo.
Author: Sardana <sa...@mail.ru>
Reviewers: Jagadish<ja...@apache.org>
Closes #28 from capdaha/patch-1
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/cb436535
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/cb436535
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/cb436535
Branch: refs/heads/master
Commit: cb4365359e000553dd1e1c24869490e3ff41c440
Parents: 004e03f
Author: Sardana <sa...@mail.ru>
Authored: Mon Dec 11 18:25:21 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 11 18:25:21 2017 -0800
----------------------------------------------------------------------
bin/grid | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/cb436535/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 5281379..8c5b7dd 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
[20/33] samza-hello-samza git commit: SAMZA-1236: Minor refinements
to the high level API tutorial
Posted by xi...@apache.org.
SAMZA-1236: Minor refinements to the high level API tutorial
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Bharath Kumarasubramanian <bk...@linkedin.com>
Closes #15 from jmakes/samza-1236-2
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/28af952d
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/28af952d
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/28af952d
Branch: refs/heads/master
Commit: 28af952d5430cb029b168fd360c844da8f9a60b4
Parents: 01fb456
Author: Jacob Maes <jm...@linkedin.com>
Authored: Thu May 18 12:28:13 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu May 18 12:28:13 2017 -0700
----------------------------------------------------------------------
.../config/wikipedia-application.properties | 24 ++++++++------------
.../application/WikipediaApplication.java | 11 ++++++---
2 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/28af952d/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
index 59a124f..aeb8069 100644
--- a/src/main/config/wikipedia-application.properties
+++ b/src/main/config/wikipedia-application.properties
@@ -15,23 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-# Job
+# Application / Job
+app.class=samza.examples.wikipedia.application.WikipediaApplication
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-application
+job.default.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-# Task/Application
-app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
-app.class=samza.examples.wikipedia.application.WikipediaApplication
-task.window.ms=10000
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-
# Wikipedia System
systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
systems.wikipedia.host=irc.wikimedia.org
@@ -44,6 +38,11 @@ systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
systems.kafka.default.stream.samza.msg.serde=json
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
# Streams which are not on default system or have special characters in the physical name.
streams.en-wikipedia.samza.system=wikipedia
streams.en-wikipedia.samza.physical.name=#en.wikipedia
@@ -60,9 +59,6 @@ stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
stores.wikipedia-stats.key.serde=string
stores.wikipedia-stats.msg.serde=integer
-# Defaults
-job.default.system=kafka
-
# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/28af952d/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 3432e3d..c320209 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -66,15 +66,20 @@ import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
public class WikipediaApplication implements StreamApplication {
private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class);
- private static final String STATS_STORE_NAME = "wikipedia-stats";
- private static final String EDIT_COUNT_KEY = "count-edits-all-time";
-
+ // Inputs
private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia";
private static final String WIKTIONARY_STREAM_ID = "en-wiktionary";
private static final String WIKINEWS_STREAM_ID = "en-wikinews";
+ // Outputs
private static final String STATS_STREAM_ID = "wikipedia-stats";
+ // Stores
+ private static final String STATS_STORE_NAME = "wikipedia-stats";
+
+ // Metrics
+ private static final String EDIT_COUNT_KEY = "count-edits-all-time";
+
@Override
public void init(StreamGraph graph, Config config) {
// Inputs
[29/33] samza-hello-samza git commit: Fixed stateful operator name
Posted by xi...@apache.org.
Fixed stateful operator name
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #25 from prateekm/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/004e03fa
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/004e03fa
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/004e03fa
Branch: refs/heads/master
Commit: 004e03fadc60aef0f5e0b345c3d0a43ed6b5e5dc
Parents: f48c7f9
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Mon Dec 11 18:24:10 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 11 18:24:10 2017 -0800
----------------------------------------------------------------------
.../samza/examples/wikipedia/application/WikipediaApplication.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/004e03fa/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 659e373..032608f 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -113,7 +113,7 @@ public class WikipediaApplication implements StreamApplication {
allWikipediaEvents
.map(WikipediaParser::parseEvent)
.window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new,
- new WikipediaStatsAggregator(), WikipediaStats.serde()), "Tumbling window of WikipediaStats")
+ new WikipediaStatsAggregator(), WikipediaStats.serde()), "statsWindow")
.map(this::formatOutput)
.sendTo(wikipediaStats);
}
[06/33] samza-hello-samza git commit: SAMZA-851: fix latest
hello-samza deploying to CDH 5.4
Posted by xi...@apache.org.
SAMZA-851: fix latest hello-samza deploying to CDH 5.4
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/06d62651
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/06d62651
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/06d62651
Branch: refs/heads/master
Commit: 06d62651d395f4bc7dd7b3c544077967ecc1edde
Parents: 6c26240
Author: Stanislav Los <sl...@gmail.com>
Authored: Tue Feb 9 20:19:31 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Feb 9 20:19:31 2016 -0800
----------------------------------------------------------------------
pom.xml | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/06d62651/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e541bb7..552c2a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,8 +105,38 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
- <version>2.6.1</version>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
</dependency>
</dependencies>
@@ -114,6 +144,7 @@ under the License.
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<samza.version>0.10.1-SNAPSHOT</samza.version>
+ <hadoop.version>2.6.1</hadoop.version>
</properties>
<developers>
@@ -157,6 +188,11 @@ under the License.
<name>Scala-tools Maven2 Repository</name>
<url>https://oss.sonatype.org/content/groups/scala-tools</url>
</repository>
+ <repository>
+ <id>cloudera-repos</id>
+ <name>Cloudera Repos</name>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
</repositories>
<pluginRepositories>
@@ -242,4 +278,20 @@ under the License.
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <!-- CDH compatible build: mvn clean package -Denv=cdh5.4.0 -->
+ <profile>
+ <id>cdh5.4.0</id>
+ <activation>
+ <property>
+ <name>env</name>
+ <value>cdh5.4.0</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>2.6.0-cdh5.4.0</hadoop.version>
+ </properties>
+ </profile>
+ </profiles>
</project>
[18/33] samza-hello-samza git commit: SAMZA-1237: Add support for
standalone mode to wikipedia application
Posted by xi...@apache.org.
SAMZA-1237: Add support for standalone mode to wikipedia application
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #13 from bharathkk/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/591aaebc
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/591aaebc
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/591aaebc
Branch: refs/heads/master
Commit: 591aaebc4ffca24a193b068fdc526e20cc57d06b
Parents: 3d0e919
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Fri May 12 07:51:59 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 12 07:51:59 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
bin/grid | 27 ++++++++-
bin/run-wikipedia-standalone-application.sh | 30 ++++++++++
src/main/assembly/src.xml | 52 ++++-------------
...ikipedia-application-local-runner.properties | 60 ++++++++++++++++++++
.../application/WikipediaApplication.java | 2 +-
.../WikipediaZkLocalApplication.java | 47 +++++++++++++++
7 files changed, 177 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ce6a..f31af00 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ deploy
*.swp
build/
.gradle/
+state
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 7d2112b..5f715b5 100755
--- a/bin/grid
+++ b/bin/grid
@@ -39,7 +39,7 @@ DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
-SERVICE_WAIT_TIMEOUT_SEC=10
+SERVICE_WAIT_TIMEOUT_SEC=20
ZOOKEEPER_PORT=2181
RESOURCEMANAGER_PORT=8032
NODEMANAGER_PORT=8042
@@ -55,6 +55,16 @@ bootstrap() {
exit 0
}
+standalone() {
+ echo "Setting up the ystem..."
+ stop_all
+ rm -rf "$DEPLOY_ROOT_DIR"
+ mkdir "$DEPLOY_ROOT_DIR"
+ install_all_without_yarn
+ start_all_without_yarn
+ exit 0
+}
+
install_all() {
$DIR/grid install samza
$DIR/grid install zookeeper
@@ -62,6 +72,12 @@ install_all() {
$DIR/grid install kafka
}
+install_all_without_yarn() {
+ $DIR/grid install samza
+ $DIR/grid install zookeeper
+ $DIR/grid install kafka
+}
+
install_samza() {
mkdir -p "$DEPLOY_ROOT_DIR"
if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
@@ -128,6 +144,11 @@ start_all() {
$DIR/grid start kafka
}
+start_all_without_yarn() {
+ $DIR/grid start zookeeper
+ $DIR/grid start kafka
+}
+
start_zookeeper() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
@@ -218,6 +239,9 @@ stop_kafka() {
if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
bootstrap
exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+ standalone
+ exit 0
elif (test -z "$COMMAND" && test -z "$SYSTEM") \
|| ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
echo
@@ -225,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \
echo
echo " $ grid"
echo " $ grid bootstrap"
+ echo " $ grid standalone"
echo " $ grid install [yarn|kafka|zookeeper|samza|all]"
echo " $ grid start [yarn|kafka|zookeeper|all]"
echo " $ grid stop [yarn|kafka|zookeeper|all]"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/run-wikipedia-standalone-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh
new file mode 100755
index 0000000..f750e2b
--- /dev/null
+++ b/bin/run-wikipedia-standalone-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication "$@"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index ca90ebf..69cbbbe 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -28,53 +28,25 @@
<include>NOTICE*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/config</directory>
+ <includes>
+ <include>*.properties</include>
+ </includes>
+ <outputDirectory>config</outputDirectory>
+ <!-- filtered=true, so we do variable expansion so the yarn package path
+ always points to the correct spot on any machine -->
+ <filtered>true</filtered>
+ </fileSet>
</fileSets>
<files>
<file>
<source>${basedir}/src/main/resources/log4j.xml</source>
<outputDirectory>lib</outputDirectory>
</file>
- <!-- filtered=true, so we do variable expansion so the yarn package path
- always points to the correct spot on any machine -->
- <file>
- <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
<file>
- <source>${basedir}/src/main/config/wikipedia-application.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-sessionizer.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-filter.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
+ <source>${basedir}/bin/run-local-app.sh</source>
+ <outputDirectory>bin</outputDirectory>
</file>
</files>
<dependencySets>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
new file mode 100644
index 0000000..965a131
--- /dev/null
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+coordinator.zk.connect=localhost:2181
+
+# Task/Application
+app.processor-id-generator.class=org.apache.samza.runtime.UUIDGenerator
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactgory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index b0779db..3432e3d 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -88,7 +88,7 @@ public class WikipediaApplication implements StreamApplication {
OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
// Merge inputs
- MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents));
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
// Parse, update stats, prepare output, and send
allWikipediaEvents.map(WikipediaParser::parseEvent)
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
new file mode 100644
index 0000000..8e978bc
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper.
+ * It waits for the job to finish; The job can also be ended by killing this process.
+ */
+public class WikipediaZkLocalApplication {
+
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config orgConfig = cmdLine.loadConfig(options);
+ Config config = Util.rewriteConfig(orgConfig);
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ WikipediaApplication app = new WikipediaApplication();
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+}
[19/33] samza-hello-samza git commit: SAMZA-1237: Modified the script
to pass properties implicitly and fixed typos in configuration
Posted by xi...@apache.org.
SAMZA-1237: Modified the script to pass properties implicitly and fixed typos in configuration
Addressed usability comments and fixed configurations. navina
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Jagadish <jv...@linkedin.com>
Closes #14 from bharathkk/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/01fb4569
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/01fb4569
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/01fb4569
Branch: refs/heads/master
Commit: 01fb45698164ccee3ca049be859b553c636e71fa
Parents: 591aaeb
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Thu May 18 12:22:21 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu May 18 12:22:21 2017 -0700
----------------------------------------------------------------------
bin/grid | 2 +-
bin/run-wikipedia-standalone-application.sh | 2 +-
src/main/assembly/src.xml | 2 +-
.../wikipedia-application-local-runner.properties | 3 +--
.../application/WikipediaZkLocalApplication.java | 13 ++++++++++---
5 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/01fb4569/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 5f715b5..5dff403 100755
--- a/bin/grid
+++ b/bin/grid
@@ -56,7 +56,7 @@ bootstrap() {
}
standalone() {
- echo "Setting up the ystem..."
+ echo "Setting up the system..."
stop_all
rm -rf "$DEPLOY_ROOT_DIR"
mkdir "$DEPLOY_ROOT_DIR"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/01fb4569/bin/run-wikipedia-standalone-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh
index f750e2b..6feea52 100755
--- a/bin/run-wikipedia-standalone-application.sh
+++ b/bin/run-wikipedia-standalone-application.sh
@@ -27,4 +27,4 @@ mkdir -p $EXECUTION_PLAN_DIR
[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication "$@"
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/01fb4569/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 69cbbbe..7832af5 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -45,7 +45,7 @@
<outputDirectory>lib</outputDirectory>
</file>
<file>
- <source>${basedir}/bin/run-local-app.sh</source>
+ <source>${basedir}/bin/run-wikipedia-standalone-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
</files>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/01fb4569/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
index 965a131..32b0290 100644
--- a/src/main/config/wikipedia-application-local-runner.properties
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -22,11 +22,10 @@ job.default.system=kafka
coordinator.zk.connect=localhost:2181
# Task/Application
-app.processor-id-generator.class=org.apache.samza.runtime.UUIDGenerator
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactgory
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/01fb4569/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
index 8e978bc..51dd28f 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -32,11 +32,18 @@ import org.apache.samza.util.Util;
*/
public class WikipediaZkLocalApplication {
- public static void main(String[] args) throws Exception {
+ /**
+ * Executes the application using the local application runner.
+ * It takes two required command line arguments
+ * config-factory: a fully {@link org.apache.samza.config.factories.PropertiesConfigFactory} class name
+ * config-path: path to application properties
+ *
+ * @param args command line arguments
+ */
+ public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
OptionSet options = cmdLine.parser().parse(args);
- Config orgConfig = cmdLine.loadConfig(options);
- Config config = Util.rewriteConfig(orgConfig);
+ Config config = cmdLine.loadConfig(options);
LocalApplicationRunner runner = new LocalApplicationRunner(config);
WikipediaApplication app = new WikipediaApplication();
[25/33] samza-hello-samza git commit: SAMZA-1441: Updated High Level
API examples in hello-samza to provide serdes in code.
Posted by xi...@apache.org.
SAMZA-1441: Updated High Level API examples in hello-samza to provide serdes in code.
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Author: Yan Fang <ya...@gmail.com>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Aleksandar Pejakovic <a....@levi9.com>
Author: Navina Ramesh <nr...@linkedin.com>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Steven Aerts <st...@gmail.com>
Author: Chris Riccomini <cr...@apache.org>
Author: Manikumar Reddy <ma...@gmail.com>
Author: Yi Pan <ni...@gmail.com>
Author: Yan Fang <ya...@apache.org>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: Stanislav Los <sl...@gmail.com>
Author: Ken Gidley <kg...@yahoo.com>
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Jagadish Venkatraman <ja...@apache.org>
Closes #24 from prateekm/serde-instance
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e5943a00
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e5943a00
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e5943a00
Branch: refs/heads/master
Commit: e5943a000eef87e077c422e09dc20f09d4e876ca
Parents: 901c3a3
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed Oct 4 16:08:49 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 16:08:49 2017 -0700
----------------------------------------------------------------------
.../config/pageview-adclick-joiner.properties | 18 +---
src/main/config/pageview-filter.properties | 18 +---
src/main/config/pageview-sessionizer.properties | 18 +---
.../config/tumbling-pageview-counter.properties | 20 +---
...ikipedia-application-local-runner.properties | 2 -
.../config/wikipedia-application.properties | 4 +-
.../java/samza/examples/cookbook/AdClick.java | 58 ----------
.../java/samza/examples/cookbook/PageView.java | 61 -----------
.../cookbook/PageViewAdClickJoiner.java | 108 +++++++++++--------
.../examples/cookbook/PageViewFilterApp.java | 47 ++++----
.../cookbook/PageViewSessionizerApp.java | 54 +++++-----
.../cookbook/TumblingPageViewCounterApp.java | 52 +++++----
.../samza/examples/cookbook/data/AdClick.java | 54 ++++++++++
.../samza/examples/cookbook/data/PageView.java | 46 ++++++++
.../examples/cookbook/data/UserPageViews.java | 51 +++++++++
.../application/WikipediaApplication.java | 98 ++++++++++-------
16 files changed, 371 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-adclick-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties
index 81ec3f6..eba7b0b 100644
--- a/src/main/config/pageview-adclick-joiner.properties
+++ b/src/main/config/pageview-adclick-joiner.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-adclick-joiner
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewAdClickJoiner
-task.inputs=kafka.pageview-join-input,kafka.adclick-join-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-filter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties
index b9e8d2a..331ee1a 100644
--- a/src/main/config/pageview-filter.properties
+++ b/src/main/config/pageview-filter.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-filter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewFilterApp
-task.inputs=kafka.pageview-filter-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-sessionizer.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties
index 847aa87..420cdde 100644
--- a/src/main/config/pageview-sessionizer.properties
+++ b/src/main/config/pageview-sessionizer.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=pageview-sessionizer
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.PageViewSessionizerApp
-task.inputs=kafka.pageview-session-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/tumbling-pageview-counter.properties
----------------------------------------------------------------------
diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties
index 09fb131..b58dbe9 100644
--- a/src/main/config/tumbling-pageview-counter.properties
+++ b/src/main/config/tumbling-pageview-counter.properties
@@ -18,29 +18,19 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=tumbling-pageview-counter
+job.container.count=2
+job.default.system=kafka
+job.coordinator.system=kafka
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
app.class=samza.examples.cookbook.TumblingPageViewCounterApp
-task.inputs=kafka.pageview-tumbling-input
task.window.ms=2000
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=string
-systems.kafka.samza.key.serde=string
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
-job.default.system=kafka
-job.container.count=2
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
index 1911e68..b770f13 100644
--- a/src/main/config/wikipedia-application-local-runner.properties
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -25,7 +25,6 @@ job.coordinator.zk.connect=localhost:2181
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
@@ -39,7 +38,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
# Streams
streams.en-wikipedia.samza.system=wikipedia
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
index aeb8069..841fcc5 100644
--- a/src/main/config/wikipedia-application.properties
+++ b/src/main/config/wikipedia-application.properties
@@ -33,13 +33,11 @@ systems.wikipedia.port=6667
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
deleted file mode 100644
index 2d15cec..0000000
--- a/src/main/java/samza/examples/cookbook/AdClick.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.cookbook;
-
-/**
- * Represents an ad click event.
- */
-public class AdClick {
- /*
- * An unique identifier for the ad
- */
- private final String adId;
- /**
- * The user that clicked the ad
- */
- private final String userId;
- /**
- * The id of the page that the ad was served from
- */
- private final String pageId;
-
- public AdClick(String message) {
- String[] adClickFields = message.split(",");
- this.adId = adClickFields[0];
- this.userId = adClickFields[1];
- this.pageId = adClickFields[2];
- }
-
- public String getAdId() {
- return adId;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public String getPageId() {
- return pageId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
deleted file mode 100644
index 7803db7..0000000
--- a/src/main/java/samza/examples/cookbook/PageView.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-/**
- * Represents a Page view event
- */
-class PageView {
- /**
- * The user that viewed the page
- */
- private final String userId;
- /**
- * The region that the page was viewed from
- */
- private final String country;
- /**
- * A trackingId for the page
- */
- private final String pageId;
-
- /**
- * Constructs a {@link PageView} from the provided string.
- *
- * @param message in the following CSV format - userId,country,url
- */
- PageView(String message) {
- String[] pageViewFields = message.split(",");
- userId = pageViewFields[0];
- country = pageViewFields[1];
- pageId = pageViewFields[2];
- }
-
- String getUserId() {
- return userId;
- }
-
- String getCountry() {
- return country;
- }
-
- String getPageId() {
- return pageId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 94c7bc3..4f491f7 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -20,15 +20,18 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.AdClick;
+import samza.examples.cookbook.data.PageView;
import java.time.Duration;
-import java.util.function.Function;
/**
* In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for
@@ -41,75 +44,94 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties
* </li>
* <li>
* Produce some messages to the "pageview-join-input" topic <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
* </li>
* <li>
* Produce some messages to the "adclick-join-input" topic with the same pageKey <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/>
- * adClickId1,user1,google.com <br/>
- * adClickId2,user1,yahoo.com
+ * {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/>
+ * {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"}
* </li>
* <li>
- * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/>
- * --property print.key=true
+ * Consume messages from the "pageview-adclick-join-output" topic <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true
* </li>
* </ol>
*
*/
public class PageViewAdClickJoiner implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class);
- private static final String INPUT_TOPIC1 = "pageview-join-input";
- private static final String INPUT_TOPIC2 = "adclick-join-input";
-
+ private static final String PAGEVIEW_TOPIC = "pageview-join-input";
+ private static final String AD_CLICK_TOPIC = "adclick-join-input";
private static final String OUTPUT_TOPIC = "pageview-adclick-join-output";
@Override
public void init(StreamGraph graph, Config config) {
+ StringSerde stringSerde = new StringSerde();
+ JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class);
+ JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class);
+ JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class);
+
+ MessageStream<PageView> pageViews = graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde);
+ MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICK_TOPIC, adClickSerde);
+ OutputStream<JoinResult> joinResults = graph.getOutputStream(OUTPUT_TOPIC, joinResultSerde);
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v);
- MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v);
+ JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction =
+ new JoinFunction<String, PageView, AdClick, JoinResult>() {
+ @Override
+ public JoinResult apply(PageView pageView, AdClick adClick) {
+ return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId());
+ }
- OutputStream<String, String, String> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m);
+ @Override
+ public String getFirstKey(PageView pageView) {
+ return pageView.pageId;
+ }
- Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId();
- Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId();
+ @Override
+ public String getSecondKey(AdClick adClick) {
+ return adClick.getPageId();
+ }
+ };
- MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn);
- MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn);
+ MessageStream<PageView> repartitionedPageViews =
+ pageViews
+ .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+ .map(KV::getValue);
- pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() {
+ MessageStream<AdClick> repartitionedAdClicks =
+ adClicks
+ .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+ .map(KV::getValue);
- @Override
- public String apply(String pageViewMsg, String adClickMsg) {
- PageView pageView = new PageView(pageViewMsg);
- AdClick adClick = new AdClick(adClickMsg);
- String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId());
- return joinResult;
- }
+ repartitionedPageViews
+ .join(repartitionedAdClicks, pageViewAdClickJoinFunction,
+ stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+ .sendTo(joinResults);
+ }
- @Override
- public String getFirstKey(String msg) {
- return new PageView(msg).getPageId();
- }
+ static class JoinResult {
+ public String pageId;
+ public String userId;
+ public String country;
+ public String adId;
- @Override
- public String getSecondKey(String msg) {
- return new AdClick(msg).getPageId();
- }
- }, Duration.ofMinutes(3)).sendTo(outputStream);
+ public JoinResult(String pageId, String userId, String country, String adId) {
+ this.pageId = pageId;
+ this.userId = userId;
+ this.country = country;
+ this.adId = adId;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index cb39553..80ce2d1 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -20,14 +20,14 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Function;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
/**
* In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream.
@@ -39,48 +39,41 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-filter-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties
* </li>
* <li>
* Produce some messages to the "pageview-filter-input" topic <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
+ * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
* </li>
* <li>
* Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/>
- * --property print.key=true </li>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true
+ * </li>
* </ol>
- *
*/
public class PageViewFilterApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class);
- private static final String FILTER_KEY = "badKey";
private static final String INPUT_TOPIC = "pageview-filter-input";
private static final String OUTPUT_TOPIC = "pageview-filter-output";
+ private static final String INVALID_USER_ID = "invalidUserId";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
-
- OutputStream<String, String, String> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m);
-
- FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId());
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
pageViews
- .partitionBy(keyFn)
- .filter(filterFn)
- .sendTo(outputStream);
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
+ .sendTo(filteredPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index 7ec4f9d..f1000ae 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -20,21 +20,22 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
/**
* In this example, we group page views by userId into sessions, and compute the number of page views for each user
- * session. A session is considered closed when there is no user activity for a 3 second duration.
+ * session. A session is considered closed when there is no user activity for a 10 second duration.
*
* <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream.
*
@@ -43,45 +44,50 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-session-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties
* </li>
* <li>
* Produce some messages to the "pageview-session-input" topic <br/>
- * user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
* </li>
* <li>
* Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/>
- * --property print.key=true
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true
* </li>
* </ol>
*
*/
public class PageViewSessionizerApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class);
private static final String INPUT_TOPIC = "pageview-session-input";
private static final String OUTPUT_TOPIC = "pageview-session-output";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, UserPageViews>> userPageViews =
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(keyFn)
- .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
- .sendTo(outputStream);
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+ .map(windowPane -> {
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage().size();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
+ .sendTo(userPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 1bc6ff4..0809180 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -20,18 +20,18 @@ package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import samza.examples.cookbook.data.PageView;
+import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
/**
* In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time
@@ -46,45 +46,51 @@ import java.util.function.Function;
* <ol>
* <li>
* Ensure that the topic "pageview-tumbling-input" is created <br/>
- * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1
* </li>
* <li>
- * Run the application using the ./bin/run-app.sh script <br/>
- * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
- * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties)
+ * Run the application using the run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties
* </li>
* <li>
- * Produce some messages to the "pageview-tumbling-input" topic <br/>
+ * Produce some messages to the "pageview-tumbling-input" topic, waiting for some time between messages <br/>
./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/>
- user1,india,google.com <br/>
- * user2,china,yahoo.com
+ * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/>
+ * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/>
+ * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"}
* </li>
* <li>
* Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/>
* </li>
* </ol>
*
*/
public class TumblingPageViewCounterApp implements StreamApplication {
- private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class);
private static final String INPUT_TOPIC = "pageview-tumbling-input";
private static final String OUTPUT_TOPIC = "pageview-tumbling-output";
@Override
public void init(StreamGraph graph, Config config) {
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v);
-
- OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString());
-
- Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry();
+ MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
+ OutputStream<KV<String, UserPageViews>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(keyFn)
- .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1))
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .window(Windows.keyedTumblingWindow(
+ kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+ .map(windowPane -> {
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/AdClick.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/AdClick.java b/src/main/java/samza/examples/cookbook/data/AdClick.java
new file mode 100644
index 0000000..42d45dc
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/AdClick.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook.data;
+
+/**
+ * An ad click event.
+ */
+public class AdClick {
+
+ private String pageId; // the unique id of the page that the ad was clicked on
+ private String adId; // an unique id for the ad
+ private String userId; // the user that clicked the ad
+
+ public String getPageId() {
+ return pageId;
+ }
+
+ public void setPageId(String pageId) {
+ this.pageId = pageId;
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public void setAdId(String adId) {
+ this.adId = adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/PageView.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java
new file mode 100644
index 0000000..9640694
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/PageView.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A page view event
+ */
+public class PageView {
+ public final String userId;
+ public final String country;
+ public final String pageId;
+
+ /**
+ * Constructs a page view event.
+ *
+ * @param pageId the id for the page that was viewed
+ * @param userId the user that viewed the page
+ * @param country the country that the page was viewed from
+ */
+ public PageView(
+ @JsonProperty("pageId") String pageId,
+ @JsonProperty("userId") String userId,
+ @JsonProperty("countryId") String country) {
+ this.userId = userId;
+ this.country = country;
+ this.pageId = pageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/UserPageViews.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/data/UserPageViews.java b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
new file mode 100644
index 0000000..9e10a14
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/data/UserPageViews.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook.data;
+
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * User page view count.
+ */
+public class UserPageViews {
+ private final String userId;
+ private final int count;
+
+ /**
+ * Constructs a user page view count.
+ *
+ * @param userId the id of the user viewing the pages
+ * @param count number of page views by the user
+ */
+ public UserPageViews(
+ @JsonProperty("userId") String userId,
+ @JsonProperty("count") int count) {
+ this.userId = userId;
+ this.count = count;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public int getCount() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index c320209..736d934 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -20,11 +20,6 @@
package samza.examples.wikipedia.application;
import com.google.common.collect.ImmutableList;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
@@ -34,6 +29,8 @@ import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
@@ -41,6 +38,12 @@ import org.slf4j.LoggerFactory;
import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
@@ -82,46 +85,32 @@ public class WikipediaApplication implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
+ // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
+ graph.setDefaultSerde(new NoOpSerde<>());
+
// Inputs
// Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
- // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
- MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
- MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
- MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID);
+ MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID);
+ MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID);
- // Output (also un-keyed, so no keyExtractor)
- OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+ // Output (also un-keyed)
+ OutputStream<WikipediaStatsOutput> wikipediaStats =
+ graph.getOutputStream(STATS_STREAM_ID, new JsonSerdeV2<>(WikipediaStatsOutput.class));
// Merge inputs
- MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents =
+ MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
// Parse, update stats, prepare output, and send
- allWikipediaEvents.map(WikipediaParser::parseEvent)
+ allWikipediaEvents
+ .map(WikipediaParser::parseEvent)
.window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
.map(this::formatOutput)
.sendTo(wikipediaStats);
}
/**
- * A few statistics about the incoming messages.
- */
- private static class WikipediaStats {
- // Windowed stats
- int edits = 0;
- int byteDiff = 0;
- Set<String> titles = new HashSet<String>();
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- // Total stats
- int totalEdits = 0;
-
- @Override
- public String toString() {
- return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
- }
- }
-
- /**
* Updates the windowed and total stats based on each "edit" event.
*
* Uses a KeyValueStore to persist a total edit count across restarts.
@@ -177,17 +166,46 @@ public class WikipediaApplication implements StreamApplication {
/**
* Format the stats for output to Kafka.
*/
- private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
-
+ private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
WikipediaStats stats = statsWindowPane.getMessage();
+ return new WikipediaStatsOutput(
+ stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
+ }
+
+ /**
+ * A few statistics about the incoming messages.
+ */
+ private static class WikipediaStats {
+ // Windowed stats
+ int edits = 0;
+ int byteDiff = 0;
+ Set<String> titles = new HashSet<>();
+ Map<String, Integer> counts = new HashMap<>();
+
+ // Total stats
+ int totalEdits = 0;
- Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
- counts.put("edits", stats.edits);
- counts.put("edits-all-time", stats.totalEdits);
- counts.put("bytes-added", stats.byteDiff);
- counts.put("unique-titles", stats.titles.size());
+ @Override
+ public String toString() {
+ return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+ }
+ }
- return counts;
+ static class WikipediaStatsOutput {
+ public int edits;
+ public int editsAllTime;
+ public int bytesAdded;
+ public int uniqueTitles;
+ public Map<String, Integer> counts;
+
+ public WikipediaStatsOutput(int edits, int editsAllTime, int bytesAdded, int uniqueTitles,
+ Map<String, Integer> counts) {
+ this.edits = edits;
+ this.editsAllTime = editsAllTime;
+ this.bytesAdded = bytesAdded;
+ this.uniqueTitles = uniqueTitles;
+ this.counts = counts;
+ }
}
}
[09/33] samza-hello-samza git commit: SAMZA-919: Add milliseconds and
thread name to log4j configs.
Posted by xi...@apache.org.
SAMZA-919: Add milliseconds and thread name to log4j configs.
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/70ad4c5a
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/70ad4c5a
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/70ad4c5a
Branch: refs/heads/master
Commit: 70ad4c5af05ee4505f317b4ca55ab1c395b5ad69
Parents: 805ff42
Author: Jacob Maes <ja...@gmail.com>
Authored: Mon Apr 4 14:25:09 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Apr 4 14:25:09 2016 -0700
----------------------------------------------------------------------
src/main/resources/log4j.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/70ad4c5a/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index 129149f..818135d 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -29,7 +29,7 @@
<param name="MaxFileSize" value="256MB" />
<param name="MaxBackupIndex" value="20" />
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
</layout>
</appender>
<root>
[07/33] samza-hello-samza git commit: SAMZA-872: remove
DailyRollingAppender from hello-samza example
Posted by xi...@apache.org.
SAMZA-872: remove DailyRollingAppender from hello-samza example
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e20ca288
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e20ca288
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e20ca288
Branch: refs/heads/master
Commit: e20ca288ee8919ec9515e5d3cc86e3d51e494d45
Parents: 06d6265
Author: Branislav Cogic <b....@levi9.com>
Authored: Fri Feb 19 10:28:24 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 19 10:28:24 2016 -0800
----------------------------------------------------------------------
src/main/resources/log4j.xml | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e20ca288/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index f0de765..129149f 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -24,9 +24,10 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
- <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+ <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
- <param name="DatePattern" value="'.'yyyy-MM-dd" />
+ <param name="MaxFileSize" value="256MB" />
+ <param name="MaxBackupIndex" value="20" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
</layout>
[10/33] samza-hello-samza git commit: SAMZA-935 - hello-samza add
kafka wait for dependent services
Posted by xi...@apache.org.
SAMZA-935 - hello-samza add kafka wait for dependent services
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/2214946c
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/2214946c
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/2214946c
Branch: refs/heads/master
Commit: 2214946c0b5498f9942e4ecdd4327fea4081b689
Parents: 70ad4c5
Author: Vishal Kuo <vi...@gmail.com>
Authored: Sat Apr 23 12:44:43 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Sat Apr 23 12:56:34 2016 -0700
----------------------------------------------------------------------
bin/grid | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2214946c/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 042cabe..74ee026 100755
--- a/bin/grid
+++ b/bin/grid
@@ -39,6 +39,12 @@ DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tg
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
+SERVICE_WAIT_TIMEOUT_SEC=10
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
bootstrap() {
echo "Bootstrapping the system..."
stop_all
@@ -126,6 +132,7 @@ start_zookeeper() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
bin/zkServer.sh start
+ wait_for_service "zookeeper" $ZOOKEEPER_PORT
cd - > /dev/null
else
echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
@@ -135,7 +142,9 @@ start_zookeeper() {
start_yarn() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
$DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+ wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
$DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+ wait_for_service "nodemanager" $NODEMANAGER_PORT
else
echo 'YARN is not installed. Run: bin/grid install yarn'
fi
@@ -147,11 +156,29 @@ start_kafka() {
cd $DEPLOY_ROOT_DIR/$SYSTEM
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
cd - > /dev/null
+ wait_for_service "kafka" $KAFKA_PORT
else
echo 'Kafka is not installed. Run: bin/grid install kafka'
fi
}
+wait_for_service() {
+ local SERVICE_NAME=$1
+ local PORT=$2
+ echo "Waiting for $SERVICE_NAME to start..."
+ local CURRENT_WAIT_TIME=0
+ until $(nc -w 1 localhost $PORT); do
+ printf '.'
+ sleep 1
+ if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+ printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+ exit 1
+ fi
+ done
+ printf '\n'
+ echo "$SERVICE_NAME has started";
+}
+
stop_all() {
$DIR/grid stop kafka
$DIR/grid stop yarn
[17/33] samza-hello-samza git commit: SAMZA-1236: Initial draft of
the fluent API example for tutorials
Posted by xi...@apache.org.
SAMZA-1236: Initial draft of the fluent API example for tutorials
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #11 from jmakes/samza-1236
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/3d0e919e
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/3d0e919e
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/3d0e919e
Branch: refs/heads/master
Commit: 3d0e919e6cf96ffab7c2028e5ebef2bd99624346
Parents: c87ed56
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri May 5 16:48:21 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 5 16:48:21 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
src/main/assembly/src.xml | 5 +
.../config/wikipedia-application.properties | 71 +++++++
src/main/config/wikipedia-parser.properties | 6 -
src/main/config/wikipedia-stats.properties | 6 +
.../application/WikipediaApplication.java | 188 +++++++++++++++++++
.../wikipedia/model/WikipediaParser.java | 80 ++++++++
.../task/WikipediaParserStreamTask.java | 57 +-----
.../task/WikipediaStatsStreamTask.java | 24 ++-
src/main/resources/log4j.xml | 9 +-
10 files changed, 376 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 40505ce..ec451d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -71,6 +71,7 @@ task distTar(dependsOn: build, type: Tar) {
include "wikipedia-feed.properties"
include "wikipedia-parser.properties"
include "wikipedia-stats.properties"
+ include "wikipedia-application.properties"
// expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle)
filter { String line ->
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index e280a9a..ca90ebf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -52,6 +52,11 @@
<filtered>true</filtered>
</file>
<file>
+ <source>${basedir}/src/main/config/wikipedia-application.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
<source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
new file mode 100644
index 0000000..59a124f
--- /dev/null
+++ b/src/main/config/wikipedia-application.properties
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-application
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task/Application
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+app.class=samza.examples.wikipedia.application.WikipediaApplication
+task.window.ms=10000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams which are not on default system or have special characters in the physical name.
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Defaults
+job.default.system=kafka
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index 6d1e3df..e8f3fa0 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -26,12 +26,6 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
task.inputs=kafka.wikipedia-raw
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index f6b85bf..0a1cf31 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -27,6 +27,12 @@ task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
task.inputs=kafka.wikipedia-edits
task.window.ms=10000
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
new file mode 100644
index 0000000..b0779db
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import samza.examples.wikipedia.model.WikipediaParser;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+
+/**
+ * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
+ * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask},
+ * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and
+ * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression.
+ *
+ * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits"
+ * streams to connect the operators, as they are not needed with the fluent API.
+ *
+ * The application processes Wikipedia events in the following steps:
+ * <ul>
+ * <li>Merge wikipedia, wiktionary, and wikinews events into one stream</li>
+ * <li>Parse each event to a more structured format</li>
+ * <li>Aggregate some stats over a 10s window</li>
+ * <li>Format each window output for public consumption</li>
+ * <li>Send the window output to Kafka</li>
+ * </ul>
+ *
+ * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which
+ * is invoked by the framework to load the application.
+ */
+public class WikipediaApplication implements StreamApplication {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class);
+
+ private static final String STATS_STORE_NAME = "wikipedia-stats";
+ private static final String EDIT_COUNT_KEY = "count-edits-all-time";
+
+ private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia";
+ private static final String WIKTIONARY_STREAM_ID = "en-wiktionary";
+ private static final String WIKINEWS_STREAM_ID = "en-wikinews";
+
+ private static final String STATS_STREAM_ID = "wikipedia-stats";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ // Inputs
+ // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
+ // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
+ MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+
+ // Output (also un-keyed, so no keyExtractor)
+ OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+
+ // Merge inputs
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents));
+
+ // Parse, update stats, prepare output, and send
+ allWikipediaEvents.map(WikipediaParser::parseEvent)
+ .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+ .map(this::formatOutput)
+ .sendTo(wikipediaStats);
+ }
+
+ /**
+ * A few statistics about the incoming messages.
+ */
+ private static class WikipediaStats {
+ // Windowed stats
+ int edits = 0;
+ int byteDiff = 0;
+ Set<String> titles = new HashSet<String>();
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ // Total stats
+ int totalEdits = 0;
+
+ @Override
+ public String toString() {
+ return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+ }
+ }
+
+ /**
+ * Updates the windowed and total stats based on each "edit" event.
+ *
+ * Uses a KeyValueStore to persist a total edit count across restarts.
+ */
+ private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> {
+
+ private KeyValueStore<String, Integer> store;
+
+ // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+ private Counter repeatEdits;
+
+ /**
+ * {@inheritDoc}
+ * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+ * get a KeyValueStore for persistence and the MetricsRegistry for metrics.
+ */
+ @Override
+ public void init(Config config, TaskContext context) {
+ store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME);
+ repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+ }
+
+ @Override
+ public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) {
+
+ // Update persisted total
+ Integer editsAllTime = store.get(EDIT_COUNT_KEY);
+ if (editsAllTime == null) editsAllTime = 0;
+ editsAllTime++;
+ store.put(EDIT_COUNT_KEY, editsAllTime);
+
+ // Update window stats
+ stats.edits++;
+ stats.totalEdits = editsAllTime;
+ stats.byteDiff += (Integer) edit.get("diff-bytes");
+ boolean newTitle = stats.titles.add((String) edit.get("title"));
+
+ Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+ for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+ if (Boolean.TRUE.equals(flag.getValue())) {
+ stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
+ }
+ }
+
+ if (!newTitle) {
+ repeatEdits.inc();
+ log.info("Frequent edits for title: {}", edit.get("title"));
+ }
+ return stats;
+ }
+ }
+
+ /**
+ * Format the stats for output to Kafka.
+ */
+ private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+
+ WikipediaStats stats = statsWindowPane.getMessage();
+
+ Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
+ counts.put("edits", stats.edits);
+ counts.put("edits-all-time", stats.totalEdits);
+ counts.put("bytes-added", stats.byteDiff);
+ counts.put("unique-titles", stats.titles.size());
+
+ return counts;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
new file mode 100644
index 0000000..9347962
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import samza.examples.wikipedia.system.WikipediaFeed;
+
+
+public class WikipediaParser {
+ public static Map<String, Object> parseEvent(WikipediaFeed.WikipediaFeedEvent wikipediaFeedEvent) {
+ Map<String, Object> parsedJsonObject = null;
+ try {
+ parsedJsonObject = WikipediaParser.parseLine(wikipediaFeedEvent.getRawEvent());
+
+ parsedJsonObject.put("channel", wikipediaFeedEvent.getChannel());
+ parsedJsonObject.put("source", wikipediaFeedEvent.getSource());
+ parsedJsonObject.put("time", wikipediaFeedEvent.getTime());
+ } catch (Exception e) {
+ System.err.println("Unable to parse line: " + wikipediaFeedEvent);
+ }
+ return parsedJsonObject;
+ }
+
+ public static Map<String, Object> parseLine(String line) {
+ Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+ Matcher m = p.matcher(line);
+
+ if (m.find() && m.groupCount() == 6) {
+ String title = m.group(1);
+ String flags = m.group(2);
+ String diffUrl = m.group(3);
+ String user = m.group(4);
+ int byteDiff = Integer.parseInt(m.group(5));
+ String summary = m.group(6);
+
+ Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+ flagMap.put("is-minor", flags.contains("M"));
+ flagMap.put("is-new", flags.contains("N"));
+ flagMap.put("is-unpatrolled", flags.contains("!"));
+ flagMap.put("is-bot-edit", flags.contains("B"));
+ flagMap.put("is-special", title.startsWith("Special:"));
+ flagMap.put("is-talk", title.startsWith("Talk:"));
+
+ Map<String, Object> root = new HashMap<String, Object>();
+
+ root.put("title", title);
+ root.put("user", user);
+ root.put("unparsed-flags", flags);
+ root.put("diff-bytes", byteDiff);
+ root.put("diff-url", diffUrl);
+ root.put("summary", summary);
+ root.put("flags", flagMap);
+
+ return root;
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
index 0505f58..aee8939 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -19,72 +19,29 @@
package samza.examples.wikipedia.task;
-import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
public class WikipediaParserStreamTask implements StreamTask {
+ private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-edits");
+
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
- try {
- Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
-
- parsedJsonObject.put("channel", event.getChannel());
- parsedJsonObject.put("source", event.getSource());
- parsedJsonObject.put("time", event.getTime());
-
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
- } catch (Exception e) {
- System.err.println("Unable to parse line: " + event);
- }
- }
-
- public static Map<String, Object> parse(String line) {
- Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
- Matcher m = p.matcher(line);
-
- if (m.find() && m.groupCount() == 6) {
- String title = m.group(1);
- String flags = m.group(2);
- String diffUrl = m.group(3);
- String user = m.group(4);
- int byteDiff = Integer.parseInt(m.group(5));
- String summary = m.group(6);
-
- Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
-
- flagMap.put("is-minor", flags.contains("M"));
- flagMap.put("is-new", flags.contains("N"));
- flagMap.put("is-unpatrolled", flags.contains("!"));
- flagMap.put("is-bot-edit", flags.contains("B"));
- flagMap.put("is-special", title.startsWith("Special:"));
- flagMap.put("is-talk", title.startsWith("Talk:"));
-
- Map<String, Object> root = new HashMap<String, Object>();
-
- root.put("title", title);
- root.put("user", user);
- root.put("unparsed-flags", flags);
- root.put("diff-bytes", byteDiff);
- root.put("diff-url", diffUrl);
- root.put("summary", summary);
- root.put("flags", flagMap);
+ Map<String, Object> parsedJsonObject = WikipediaParser.parseEvent(event);
- return root;
- } else {
- throw new IllegalArgumentException();
+ if (parsedJsonObject != null) {
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, parsedJsonObject));
}
}
@@ -92,7 +49,7 @@ public class WikipediaParserStreamTask implements StreamTask {
String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" };
for (String line : lines) {
- System.out.println(parse(line));
+ System.out.println(WikipediaParser.parseLine(line));
}
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
index 60fd93d..abe760a 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -36,14 +37,20 @@ import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
+ private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-stats");
+
private int edits = 0;
private int byteDiff = 0;
private Set<String> titles = new HashSet<String>();
private Map<String, Integer> counts = new HashMap<String, Integer>();
private KeyValueStore<String, Integer> store;
+ // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+ private Counter repeatEdits;
+
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+ this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
}
@SuppressWarnings("unchecked")
@@ -57,21 +64,18 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
store.put("count-edits-all-time", editsAllTime + 1);
edits += 1;
- titles.add((String) edit.get("title"));
byteDiff += (Integer) edit.get("diff-bytes");
+ boolean newTitle = titles.add((String) edit.get("title"));
for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
if (Boolean.TRUE.equals(flag.getValue())) {
- Integer count = counts.get(flag.getKey());
-
- if (count == null) {
- count = 0;
- }
-
- count += 1;
- counts.put(flag.getKey(), count);
+ counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
}
}
+
+ if (!newTitle) {
+ repeatEdits.inc();
+ }
}
@Override
@@ -81,7 +85,7 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
counts.put("unique-titles", titles.size());
counts.put("edits-all-time", store.get("count-edits-all-time"));
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, counts));
// Reset counts after windowing.
edits = 0;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index 086d6b8..805d5ca 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -40,14 +40,13 @@
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
</layout>
</appender>
+ <logger name="STARTUP_LOGGER" additivity="false">
+ <level value="info" />
+ <appender-ref ref="StartupAppender"/>
+ </logger>
<root>
<priority value="info" />
<appender-ref ref="RollingAppender"/>
<appender-ref ref="jmx" />
</root>
- <logger name="STARTUP_LOGGER" additivity="false">
- <level value="info" />
- <appender-ref ref="StartupAppender"/>
- </logger>
-
</log4j:configuration>
[13/33] samza-hello-samza git commit: SAMZA-1002 - Add mavenLocal()
to build.gradle in hello-samza latest branch
Posted by xi...@apache.org.
SAMZA-1002 - Add mavenLocal() to build.gradle in hello-samza latest branch
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/32c90771
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/32c90771
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/32c90771
Branch: refs/heads/master
Commit: 32c90771a4cc6ad83a9b81ef52706360e20b4fd0
Parents: b240b65
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Thu Aug 18 10:30:05 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu Aug 18 10:30:05 2016 -0700
----------------------------------------------------------------------
build.gradle | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/32c90771/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1a59780..e838b18 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,6 +30,7 @@ task wrapper(type: Wrapper) {
version = "$SAMZA_VERSION"
repositories {
+ mavenLocal()
mavenCentral()
maven { url "https://repository.apache.org/content/groups/public" }
}
[11/33] samza-hello-samza git commit: SAMZA-989: update hello-samza
to use the startup logger
Posted by xi...@apache.org.
SAMZA-989: update hello-samza to use the startup logger
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/31244946
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/31244946
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/31244946
Branch: refs/heads/master
Commit: 31244946a7a333aa712e629dcc6322c37e77d936
Parents: 2214946
Author: Jacob Maes <ja...@gmail.com>
Authored: Mon Aug 1 12:00:14 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Aug 1 12:00:14 2016 -0700
----------------------------------------------------------------------
src/main/resources/log4j.xml | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/31244946/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index 818135d..086d6b8 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -32,9 +32,22 @@
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
</layout>
</appender>
+ <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
+ <param name="MaxFileSize" value="256MB" />
+ <param name="MaxBackupIndex" value="1" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
<root>
<priority value="info" />
<appender-ref ref="RollingAppender"/>
<appender-ref ref="jmx" />
</root>
+ <logger name="STARTUP_LOGGER" additivity="false">
+ <level value="info" />
+ <appender-ref ref="StartupAppender"/>
+ </logger>
+
</log4j:configuration>
[26/33] samza-hello-samza git commit: Update the code based on latest
samza
Posted by xi...@apache.org.
Update the code based on latest samza
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/48c858ac
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/48c858ac
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/48c858ac
Branch: refs/heads/master
Commit: 48c858ac0343d6ae6a1f1ce4451d6ddab65be903
Parents: e5943a0
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Thu Oct 26 17:24:16 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Oct 26 17:24:16 2017 -0700
----------------------------------------------------------------------
bin/deploy.sh | 26 +++++++++++
.../cookbook/PageViewAdClickJoiner.java | 6 +--
.../examples/cookbook/PageViewFilterApp.java | 2 +-
.../cookbook/PageViewSessionizerApp.java | 23 +++++----
.../cookbook/TumblingPageViewCounterApp.java | 14 +++---
.../application/WikipediaApplication.java | 49 +++++++++++++++++++-
6 files changed, 100 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
new file mode 100755
index 0000000..51faed1
--- /dev/null
+++ b/bin/deploy.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+
+mvn clean package
+mkdir -p $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-0.13.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 4f491f7..f6c3810 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -107,17 +107,17 @@ public class PageViewAdClickJoiner implements StreamApplication {
MessageStream<PageView> repartitionedPageViews =
pageViews
- .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+ .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde), "pageview")
.map(KV::getValue);
MessageStream<AdClick> repartitionedAdClicks =
adClicks
- .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+ .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde), "adclick")
.map(KV::getValue);
repartitionedPageViews
.join(repartitionedAdClicks, pageViewAdClickJoinFunction,
- stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+ stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3), "join")
.sendTo(joinResults);
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index 80ce2d1..a2accfd 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -72,7 +72,7 @@ public class PageViewFilterApp implements StreamApplication {
OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
.filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
.sendTo(filteredPageViews);
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index f1000ae..2bcd9f5 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -27,11 +27,13 @@ import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
import samza.examples.cookbook.data.PageView;
import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
+import java.util.function.Function;
/**
* In this example, we group page views by userId into sessions, and compute the number of page views for each user
@@ -74,20 +76,25 @@ public class PageViewSessionizerApp implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
- graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
+ Serde<String> stringSerde = new StringSerde();
+ Serde<PageView> pageviewSerde = new JsonSerdeV2<>(PageView.class);
+ KVSerde<String, PageView> pageViewKVSerde = KVSerde.of(stringSerde, pageviewSerde);
+ Serde<UserPageViews> userPageviewSerde = new JsonSerdeV2<>(UserPageViews.class);
+ graph.setDefaultSerde(pageViewKVSerde);
MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
OutputStream<KV<String, UserPageViews>> userPageViews =
- graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(stringSerde, userPageviewSerde));
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
- .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
+ .window(Windows.keyedSessionWindow(kv -> kv.value.userId,
+ Duration.ofSeconds(10), stringSerde, pageViewKVSerde), "usersession")
.map(windowPane -> {
- String userId = windowPane.getKey().getKey();
- int views = windowPane.getMessage().size();
- return KV.of(userId, new UserPageViews(userId, views));
- })
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage().size();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(userPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 0809180..acf1411 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -25,6 +25,7 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -83,14 +84,15 @@ public class TumblingPageViewCounterApp implements StreamApplication {
graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "userId")
.window(Windows.keyedTumblingWindow(
- kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+ kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+ new StringSerde(), new IntegerSerde()), "count")
.map(windowPane -> {
- String userId = windowPane.getKey().getKey();
- int views = windowPane.getMessage();
- return KV.of(userId, new UserPageViews(userId, views));
- })
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 736d934..659e373 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
@@ -38,6 +39,12 @@ import org.slf4j.LoggerFactory;
import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
@@ -105,7 +112,8 @@ public class WikipediaApplication implements StreamApplication {
// Parse, update stats, prepare output, and send
allWikipediaEvents
.map(WikipediaParser::parseEvent)
- .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+ .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new,
+ new WikipediaStatsAggregator(), WikipediaStats.serde()), "Tumbling window of WikipediaStats")
.map(this::formatOutput)
.sendTo(wikipediaStats);
}
@@ -175,7 +183,7 @@ public class WikipediaApplication implements StreamApplication {
/**
* A few statistics about the incoming messages.
*/
- private static class WikipediaStats {
+ public static class WikipediaStats {
// Windowed stats
int edits = 0;
int byteDiff = 0;
@@ -189,6 +197,43 @@ public class WikipediaApplication implements StreamApplication {
public String toString() {
return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
}
+
+ static Serde<WikipediaStats> serde() {
+ return new WikipediaStatsSerde();
+ }
+
+ public static class WikipediaStatsSerde implements Serde<WikipediaStats> {
+ @Override
+ public WikipediaStats fromBytes(byte[] bytes) {
+ try {
+ ByteArrayInputStream bias = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bias);
+ WikipediaStats stats = new WikipediaStats();
+ stats.edits = ois.readInt();
+ stats.byteDiff = ois.readInt();
+ stats.titles = (Set<String>) ois.readObject();
+ stats.counts = (Map<String, Integer>) ois.readObject();
+ return stats;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] toBytes(WikipediaStats wikipediaStats) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream dos = new ObjectOutputStream(baos);
+ dos.writeInt(wikipediaStats.edits);
+ dos.writeInt(wikipediaStats.byteDiff);
+ dos.writeObject(wikipediaStats.titles);
+ dos.writeObject(wikipediaStats.counts);
+ return baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
static class WikipediaStatsOutput {
[08/33] samza-hello-samza git commit: SAMZA-884: fix jackson
dependency in hello-samza
Posted by xi...@apache.org.
SAMZA-884: fix jackson dependency in hello-samza
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/805ff426
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/805ff426
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/805ff426
Branch: refs/heads/master
Commit: 805ff4269c37a76480e7059738d113dca1d50a7e
Parents: e20ca28
Author: Yi Pan <ni...@gmail.com>
Authored: Fri Mar 4 11:50:09 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Mar 4 11:50:09 2016 -0800
----------------------------------------------------------------------
build.gradle | 2 +-
pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/805ff426/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index cbbd2b3..1a59780 100644
--- a/build.gradle
+++ b/build.gradle
@@ -40,7 +40,7 @@ configurations {
}
dependencies {
- compile(group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.8.5')
+ compile(group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13')
compile(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION")
compile(group: 'org.slf4j', name: 'slf4j-log4j12', version: "$SLF4J_VERSION")
compile(group: 'org.schwering', name: 'irclib', version: '1.10')
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/805ff426/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 552c2a9..9745c93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@ under the License.
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
- <version>1.8.5</version>
+ <version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
[24/33] samza-hello-samza git commit: Updated self and samza
dependency versions.
Posted by xi...@apache.org.
Updated self and samza dependency versions.
'latest' branch should point to the current samza master version, which is 0.13.1-SNAPSHOT.
Updated the samza-hello-samza version to 0.13.1 as well, although that probably isn't relevant since we don't publish this artifact (right?).
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #22 from prateekm/version-update
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/901c3a39
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/901c3a39
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/901c3a39
Branch: refs/heads/master
Commit: 901c3a390b7a6ffe60ffa6d1367681607fcc8203
Parents: 373b3a0
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Jun 14 15:22:20 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Jun 14 15:22:20 2017 -0700
----------------------------------------------------------------------
bin/grid | 1 +
gradle.properties | 2 +-
pom.xml | 4 ++--
3 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/901c3a39/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 5dff403..5281379 100755
--- a/bin/grid
+++ b/bin/grid
@@ -79,6 +79,7 @@ install_all_without_yarn() {
}
install_samza() {
+ echo "Building samza from master..."
mkdir -p "$DEPLOY_ROOT_DIR"
if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
pushd "$DOWNLOAD_CACHE_DIR/samza"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/901c3a39/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 294875b..800e15b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=0.13.0-SNAPSHOT
+SAMZA_VERSION=0.13.1-SNAPSHOT
KAFKA_VERSION=0.10.1.1
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/901c3a39/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a0b54e..ada4c2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.13.0</version>
+ <version>0.13.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -143,7 +143,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.13.0-SNAPSHOT</samza.version>
+ <samza.version>0.13.1-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
[02/33] samza-hello-samza git commit: SAMZA-847: update latest to use
0.10.1-SNAPSHOT
Posted by xi...@apache.org.
SAMZA-847: update latest to use 0.10.1-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/28d1ca17
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/28d1ca17
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/28d1ca17
Branch: refs/heads/master
Commit: 28d1ca175a08669f7b0b8665804b0a0252652604
Parents: 3be8dbd
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Dec 21 17:05:18 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Dec 21 17:05:18 2015 -0800
----------------------------------------------------------------------
gradle.properties | 4 ++--
pom.xml | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/28d1ca17/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 83b150f..b7729bd 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,9 +17,9 @@
* under the License.
*/
-SAMZA_VERSION=0.9.0
+SAMZA_VERSION=0.10.1-SNAPSHOT
KAFKA_VERSION=0.8.2.1
-HADOOP_VERSION=2.4.0
+HADOOP_VERSION=2.6.1
SLF4J_VERSION = 1.7.7
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/28d1ca17/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f8cd7f7..e541bb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.10.0</version>
+ <version>0.10.1</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -113,7 +113,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.10.0-SNAPSHOT</samza.version>
+ <samza.version>0.10.1-SNAPSHOT</samza.version>
</properties>
<developers>
[23/33] samza-hello-samza git commit: Update readme
Posted by xi...@apache.org.
Update readme
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #20 from jmakes/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/373b3a09
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/373b3a09
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/373b3a09
Branch: refs/heads/master
Commit: 373b3a09e3d11a7c87fdd4a63024743b2c601453
Parents: 132a17f
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 9 16:33:10 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Jun 9 16:33:10 2017 -0700
----------------------------------------------------------------------
README.md | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/373b3a09/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 4390006..0f80e9e 100644
--- a/README.md
+++ b/README.md
@@ -3,12 +3,12 @@ hello-samza
Hello Samza is a starter project for [Apache Samza](http://samza.apache.org/) jobs.
-Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) to get started.
+Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) to get started.
### Pull requests and questions
-[Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.9/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
+[Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
### Contribution
-To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.10/) git repository does not support git pull request.
+To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) git repository does not support git pull request.
[05/33] samza-hello-samza git commit: SAMZA-730: added metrics serde
for the metrics stream (was using json serde)
Posted by xi...@apache.org.
SAMZA-730: added metrics serde for the metrics stream (was using json serde)
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/6c262408
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/6c262408
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/6c262408
Branch: refs/heads/master
Commit: 6c26240811b92a78460f02df0b4cc6d1c3fcade4
Parents: c1c153f
Author: Steven Aerts <st...@gmail.com>
Authored: Mon Jul 13 14:58:09 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Dec 21 23:02:18 2015 -0800
----------------------------------------------------------------------
src/main/config/wikipedia-parser.properties | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/6c262408/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index 8f1086f..6d1e3df 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -39,6 +39,7 @@ serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotS
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
+systems.kafka.streams.metrics.samza.msg.serde=metrics
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092
[21/33] samza-hello-samza git commit: SAMZA-1295: Add a script to merge pull requests in the hello-samza pr…
Posted by xi...@apache.org.
SAMZA-1295: Add a script to merge pull requests in the hello-samza pr…
…oject
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Jagadish <jv...@linkedin.com>
Closes #16 from jmakes/samza-1295
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/35e017c8
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/35e017c8
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/35e017c8
Branch: refs/heads/master
Commit: 35e017c8e9ede1048f4e9efa715597030f8d15eb
Parents: 28af952
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri May 19 10:15:38 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 19 10:15:38 2017 -0700
----------------------------------------------------------------------
bin/merge-pull-request.py | 508 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 508 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/35e017c8/bin/merge-pull-request.py
----------------------------------------------------------------------
diff --git a/bin/merge-pull-request.py b/bin/merge-pull-request.py
new file mode 100755
index 0000000..fc5979b
--- /dev/null
+++ b/bin/merge-pull-request.py
@@ -0,0 +1,508 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utility for creating well-formed pull request merges and pushing them to Apache. This script is a modified version
+of the one created by the Kafka project (https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py).
+
+ Usage: ./samza-merge-pr.py (see config env vars below)
+
+This utility assumes you already have local a samza git folder and that you have added remotes corresponding to both:
+ (i) the github apache samza mirror and
+ (ii) the apache samza git repo.
+
+Note:
+ This script has been borrowed from the Apache Kafka team that can be found here:
+ https://github.com/apache/kafka/blob/trunk/kafka-merge-pr.py
+ It has been modified for the Apache Samza project.
+"""
+
+import json
+import os
+import re
+import subprocess
+import sys
+import urllib2
+
+try:
+ import jira.client
+
+ JIRA_IMPORTED = True
+except ImportError:
+ JIRA_IMPORTED = False
+
+PROJECT_NAME = "samza-hello-samza"
+
+# JIRA project name
+CAPITALIZED_PROJECT_NAME = "samza".upper()
+
+# Remote name which points to the GitHub site
+PR_REMOTE_NAME = os.environ.get("PR_REMOTE_NAME", "samza-github")
+# Remote name which points to Apache git
+PUSH_REMOTE_NAME = os.environ.get("PUSH_REMOTE_NAME", "samza-apache")
+# ASF JIRA username
+JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "")
+# ASF JIRA password
+JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
+
+"""
+OAuth key used for issuing requests against the GitHub API. If this is not defined, then requests
+will be unauthenticated. You should only need to configure this if you find yourself regularly
+exceeding your IP's unauthenticated request rate limit. You can create an OAuth key at
+https://github.com/settings/tokens. This script only requires the "public_repo" scope.
+"""
+GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY", "")
+
+GITHUB_BASE = "https://github.com/apache/%s/pull" % (PROJECT_NAME)
+GITHUB_API_BASE = "https://api.github.com/repos/apache/%s" % (PROJECT_NAME)
+JIRA_BASE = "https://issues.apache.org/jira/browse"
+JIRA_API_BASE = "https://issues.apache.org/jira"
+# Prefix added to temporary branches
+TEMP_BRANCH_PREFIX = "PR_TOOL"
+
+
+def get_json(url):
+ try:
+ request = urllib2.Request(url)
+ if GITHUB_OAUTH_KEY:
+ request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY)
+ return json.load(urllib2.urlopen(request))
+ except urllib2.HTTPError as e:
+ if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0':
+ print "Exceeded the GitHub API rate limit; see the instructions in " + \
+ "samza-merge-pr.py to configure an OAuth token for making authenticated " + \
+ "GitHub requests."
+ else:
+ print "Unable to fetch URL, exiting: %s" % url
+ sys.exit(-1)
+
+
+def fail(msg):
+ print msg
+ clean_up()
+ sys.exit(-1)
+
+
+def run_cmd(cmd):
+ if isinstance(cmd, list):
+ # Add debugging information on the command being run.
+ print " ".join(cmd)
+ return subprocess.check_output(cmd)
+ else:
+ print cmd
+ return subprocess.check_output(cmd.split(" "))
+
+
+def continue_maybe(prompt):
+ result = raw_input("\n%s (y/n): " % prompt)
+ if result.lower() != "y":
+ fail("Okay, exiting")
+
+
+def clean_up():
+ if original_head != get_current_branch():
+ print "Restoring head pointer to %s" % original_head
+ run_cmd("git checkout %s" % original_head)
+
+ branches = run_cmd("git branch").replace(" ", "").split("\n")
+
+ for branch in filter(lambda x: x.startswith(TEMP_BRANCH_PREFIX), branches):
+ print "Deleting local branch %s" % branch
+ run_cmd("git branch -D %s" % branch)
+
+
+def get_current_branch():
+ return run_cmd("git rev-parse --abbrev-ref HEAD").replace("\n", "")
+
+def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
+ """
+ Merge the requested Pull Request number
+ Returns the Merge Hash
+
+ :param pr_num: Pull Request Number
+ :param target_ref:
+ :param title: Commit title
+ :param body:
+ :param pr_repo_desc:
+ :return:
+ """
+ pr_branch_name = "%s_MERGE_PR_%s" % (TEMP_BRANCH_PREFIX, pr_num)
+ target_branch_name = "%s_MERGE_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, target_ref.upper())
+ run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name))
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
+ run_cmd("git checkout %s" % target_branch_name)
+
+ had_conflicts = False
+ try:
+ run_cmd(['git', 'merge', pr_branch_name, '--squash'])
+ except Exception as e:
+ msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
+ continue_maybe(msg)
+ msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
+ continue_maybe(msg)
+ had_conflicts = True
+
+ commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%an <%ae>']).split("\n")
+ distinct_authors = sorted(set(commit_authors),
+ key=lambda x: commit_authors.count(x), reverse=True)
+ primary_author = raw_input(
+ "Enter primary author in the format of \"name <email>\" [%s]: " %
+ distinct_authors[0])
+ if primary_author == "":
+ primary_author = distinct_authors[0]
+
+ reviewers = raw_input(
+ "Enter reviewers in the format of \"name1 <email1>, name2 <email2>\": ").strip()
+
+ commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
+ '--pretty=format:%h [%an] %s']).split("\n")
+
+ if len(commits) > 1:
+ result = raw_input("List pull request commits in squashed commit message? (y/n): ")
+ if result.lower() == "y":
+ should_list_commits = True
+ else:
+ should_list_commits = False
+ else:
+ should_list_commits = False
+
+ merge_message_flags = []
+
+ merge_message_flags += ["-m", title]
+ if body is not None:
+ # We remove @ symbols from the body to avoid triggering e-mails
+ # to people every time someone creates a public fork of the project.
+ merge_message_flags += ["-m", body.replace("@", "")]
+
+ authors = "\n".join(["Author: %s" % a for a in distinct_authors])
+
+ merge_message_flags += ["-m", authors]
+
+ if (reviewers != ""):
+ merge_message_flags += ["-m", "Reviewers: %s" % reviewers]
+
+ if had_conflicts:
+ committer_name = run_cmd("git config --get user.name").strip()
+ committer_email = run_cmd("git config --get user.email").strip()
+ message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
+ committer_name, committer_email)
+ merge_message_flags += ["-m", message]
+
+ # The string "Closes #%s" string is required for GitHub to correctly close the PR
+ close_line = "Closes #%s from %s" % (pr_num, pr_repo_desc)
+ if should_list_commits:
+ close_line += " and squashes the following commits:"
+ merge_message_flags += ["-m", close_line]
+
+ if should_list_commits:
+ merge_message_flags += ["-m", "\n".join(commits)]
+
+ run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)
+
+ continue_maybe("Merge complete (local ref %s). Push to %s?" % (
+ target_branch_name, PUSH_REMOTE_NAME))
+
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
+
+ merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8]
+ clean_up()
+ print("Pull request #%s merged!" % pr_num)
+ print("Merge hash: %s" % merge_hash)
+ return merge_hash
+
+
+def cherry_pick(pr_num, merge_hash, default_branch):
+ pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
+ if pick_ref == "":
+ pick_ref = default_branch
+
+ pick_branch_name = "%s_PICK_PR_%s_%s" % (TEMP_BRANCH_PREFIX, pr_num, pick_ref.upper())
+
+ run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name))
+ run_cmd("git checkout %s" % pick_branch_name)
+
+ try:
+ run_cmd("git cherry-pick -sx %s" % merge_hash)
+ except Exception as e:
+ msg = "Error cherry-picking: %s\nWould you like to manually fix-up this merge?" % e
+ continue_maybe(msg)
+ msg = "Okay, please fix any conflicts and finish the cherry-pick. Finished?"
+ continue_maybe(msg)
+
+ continue_maybe("Pick complete (local ref %s). Push to %s?" % (
+ pick_branch_name, PUSH_REMOTE_NAME))
+
+ try:
+ run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref))
+ except Exception as e:
+ clean_up()
+ fail("Exception while pushing: %s" % e)
+
+ pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8]
+ clean_up()
+
+ print("Pull request #%s picked into %s!" % (pr_num, pick_ref))
+ print("Pick hash: %s" % pick_hash)
+ return pick_ref
+
+
+def fix_version_from_branch(branch, versions):
+ """
+ Returns fix-version from the branch names
+ :param branch:
+ :param versions: List of sorted (newest->oldest) list of unrelease versions (includes "master" branch)
+ :return: Returns "master" if the current branch is master. Otherwise, it returns the oldest branch version
+ that starts with "master". Else, it returns None.
+ """
+ if branch == "master":
+ if len(versions) > 0:
+ return versions[0]
+ else:
+ return None
+ else:
+ v = filter(lambda x: x.startswith(branch), versions)
+ if len(v) > 0:
+ return v[-1]
+ else:
+ print("Could not find branch %s in versions: %s" % (branch, versions))
+ return None
+
+
+def resolve_jira_issue(title, merge_branches, comment):
+ """
+ Updates the assignee (if not already provided) and fix-versions before resolving the JIRA
+
+ :param title: Standardized commit title
+ :param merge_branches: Default fix-versions for this JIRA
+ :param comment: Comment added to the JIRA
+ """
+ asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
+ basic_auth=(JIRA_USERNAME, JIRA_PASSWORD))
+ default_jira_id = ""
+ jira_ids = re.findall("%s-[0-9]{4,5}" % CAPITALIZED_PROJECT_NAME, title)
+ if len(jira_ids) > 0:
+ default_jira_id = jira_ids[0]
+
+ jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
+ if jira_id == "":
+ jira_id = default_jira_id
+
+ try:
+ issue = asf_jira.issue(jira_id)
+ except Exception as e:
+ fail("ASF JIRA could not find %s\n%s" % (jira_id, e))
+
+ cur_status = issue.fields.status.name
+ cur_summary = issue.fields.summary
+ cur_assignee = issue.fields.assignee
+ if cur_assignee is None:
+ cur_assignee = "NOT ASSIGNED!!!"
+ else:
+ cur_assignee = cur_assignee.displayName
+
+ if cur_status == "Resolved" or cur_status == "Closed":
+ fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status))
+ print ("=== JIRA %s ===" % jira_id)
+ print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % (
+ cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id))
+
+ versions = asf_jira.project_versions(CAPITALIZED_PROJECT_NAME)
+ versions = sorted(versions, key=lambda x: x.name, reverse=True)
+ versions = filter(lambda x: x.raw['released'] is False, versions)
+
+ version_names = map(lambda x: x.name, versions)
+ default_fix_versions = map(lambda x: fix_version_from_branch(x, version_names), merge_branches)
+ default_fix_versions = filter(lambda x: x != None, default_fix_versions)
+ default_fix_versions = ",".join(default_fix_versions)
+
+ fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions)
+ if fix_versions == "":
+ fix_versions = default_fix_versions
+ fix_versions = fix_versions.replace(" ", "").split(",")
+
+ def get_version_json(version_str):
+ return filter(lambda v: v.name == version_str, versions)[0].raw
+
+ jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+
+ resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
+ resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
+ asf_jira.transition_issue(
+ jira_id, resolve["id"], fixVersions=jira_fix_versions,
+ comment=comment, resolution={'id': resolution.raw['id']})
+
+ print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
+
+
+def standardize_jira_ref(text):
+ """
+ Standardize the jira reference commit message prefix to "PROJECT_NAME-XXX: Issue"
+ >>> standardize_jira_ref("%s-5954: Top by key" % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-5954: Top by key'
+ >>> standardize_jira_ref("%s-5821: ParquetRelation2 CTAS should check if delete is successful" % PROJECT_NAME)
+ 'SAMZA-5821: ParquetRelation2 CTAS should check if delete is successful'
+ >>> standardize_jira_ref("%s-4123 [WIP] Show new dependencies added in pull requests" % PROJECT_NAME)
+ 'SAMZA-4123: [WIP] Show new dependencies added in pull requests'
+ >>> standardize_jira_ref("%s 5954: Top by key" % PROJECT_NAME)
+ 'SAMZA-5954: Top by key'
+ >>> standardize_jira_ref("%s-979 a LRU scheduler for load balancing in TaskSchedulerImpl" % PROJECT_NAME)
+ 'SAMZA-979: a LRU scheduler for load balancing in TaskSchedulerImpl'
+ >>> standardize_jira_ref("%s-1094 Support MiMa for reporting binary compatibility across versions." % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-1094: Support MiMa for reporting binary compatibility across versions.'
+ >>> standardize_jira_ref("[WIP] %s-1146: Vagrant support" % CAPITALIZED_PROJECT_NAME)
+ 'SAMZA-1146: [WIP] Vagrant support'
+ >>> standardize_jira_ref("%s-1032. If Yarn app fails before registering, app master stays aroun..." % PROJECT_NAME)
+ 'SAMZA-1032: If Yarn app fails before registering, app master stays aroun...'
+ >>> standardize_jira_ref("%s-6250 %s-6146 %s-5911: Types are now reserved words in DDL parser." % (PROJECT_NAME, PROJECT_NAME, CAPITALIZED_PROJECT_NAME))
+ 'SAMZA-6250 SAMZA-6146 SAMZA-5911: Types are now reserved words in DDL parser.'
+ >>> standardize_jira_ref("Additional information for users building from source code")
+ 'Additional information for users building from source code'
+
+ :param text: Text provided that will be used a commit message
+ :return: Standardized commit message that includes the JIRA number as well
+ """
+ jira_refs = []
+ components = []
+
+ # Extract JIRA ref(s):
+ pattern = re.compile(r'(%s[-\s]*[0-9]{3,6})+' % CAPITALIZED_PROJECT_NAME, re.IGNORECASE)
+ for ref in pattern.findall(text):
+ # Add brackets, replace spaces with a dash, & convert to uppercase
+ jira_refs.append(re.sub(r'\s+', '-', ref.upper()))
+ text = text.replace(ref, '')
+
+ # Extract project name component(s):
+ # Look for alphanumeric chars, spaces, dashes, periods, and/or commas
+ pattern = re.compile(r'(\[[\w\s,-\.]+\])', re.IGNORECASE)
+ for component in pattern.findall(text):
+ components.append(component.upper())
+ text = text.replace(component, '')
+
+ # Cleanup any remaining symbols:
+ pattern = re.compile(r'^\W+(.*)', re.IGNORECASE)
+ if (pattern.search(text) is not None):
+ text = pattern.search(text).groups()[0]
+
+ # Assemble full text (JIRA ref(s), module(s), remaining text)
+ jira_prefix = ' '.join(jira_refs).strip()
+ if jira_prefix:
+ jira_prefix = jira_prefix + ": "
+ clean_text = jira_prefix + ' '.join(components).strip() + " " + text.strip()
+
+ # Replace multiple spaces with a single space, e.g. if no jira refs and/or components were included
+ clean_text = re.sub(r'\s+', ' ', clean_text.strip())
+
+ return clean_text
+
+
+def main():
+ global original_head
+
+ original_head = get_current_branch()
+
+ latest_branch = "master"
+
+ pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): ")
+ pr = get_json("%s/pulls/%s" % (GITHUB_API_BASE, pr_num))
+ pr_events = get_json("%s/issues/%s/events" % (GITHUB_API_BASE, pr_num))
+
+ url = pr["url"]
+
+ pr_title = pr["title"]
+ commit_title = raw_input("Commit title [%s]: " % pr_title.encode("utf-8")).decode("utf-8")
+ if commit_title == "":
+ commit_title = pr_title
+
+ # Decide whether to use the modified title or not
+ modified_title = standardize_jira_ref(commit_title)
+ if modified_title != commit_title:
+ print "I've re-written the title as follows to match the standard format:"
+ print "Original: %s" % commit_title
+ print "Modified: %s" % modified_title
+ result = raw_input("Would you like to use the modified title? (y/n): ")
+ if result.lower() == "y":
+ commit_title = modified_title
+ print "Using modified title:"
+ else:
+ print "Using original title:"
+ print commit_title
+
+ body = pr["body"]
+ target_ref = pr["base"]["ref"]
+ user_login = pr["user"]["login"]
+ base_ref = pr["head"]["ref"]
+ pr_repo_desc = "%s/%s" % (user_login, base_ref)
+
+ # Merged pull requests don't appear as merged in the GitHub API;
+ # Instead, they're closed by asfgit.
+ merge_commits = \
+ [e for e in pr_events if e["actor"]["login"] == "asfgit" and e["event"] == "closed"]
+
+ if merge_commits:
+ merge_hash = merge_commits[0]["commit_id"]
+ message = get_json("%s/commits/%s" % (GITHUB_API_BASE, merge_hash))["commit"]["message"]
+
+ print "Pull request %s has already been merged, assuming you want to backport" % pr_num
+ commit_is_downloaded = run_cmd(['git', 'rev-parse', '--quiet', '--verify',
+ "%s^{commit}" % merge_hash]).strip() != ""
+ if not commit_is_downloaded:
+ fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num)
+
+ print "Found commit %s:\n%s" % (merge_hash, message)
+ cherry_pick(pr_num, merge_hash, latest_branch)
+ sys.exit(0)
+
+ if not bool(pr["mergeable"]):
+ msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
+ "Continue? (experts only!)"
+ continue_maybe(msg)
+
+ print ("\n=== Pull Request #%s ===" % pr_num)
+ print ("PR title\t%s\nCommit title\t%s\nSource\t\t%s\nTarget\t\t%s\nURL\t\t%s" % (
+ pr_title, commit_title, pr_repo_desc, target_ref, url))
+ continue_maybe("Proceed with merging pull request #%s?" % pr_num)
+
+ merged_refs = [target_ref]
+
+ merge_hash = merge_pr(pr_num, target_ref, commit_title, body, pr_repo_desc)
+
+ pick_prompt = "Would you like to pick %s into another branch?" % merge_hash
+ while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
+ merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)]
+
+ if JIRA_IMPORTED:
+ if JIRA_USERNAME and JIRA_PASSWORD:
+ continue_maybe("Would you like to update an associated JIRA?")
+ jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num)
+ resolve_jira_issue(commit_title, merged_refs, jira_comment)
+ else:
+ print "JIRA_USERNAME and JIRA_PASSWORD not set"
+ print "Exiting without trying to close the associated JIRA."
+ else:
+ print "Could not find jira-python library. Run 'sudo pip install jira' to install."
+ print "Exiting without trying to close the associated JIRA."
+
+
+if __name__ == "__main__":
+ main()
[14/33] samza-hello-samza git commit: SAMZA-1038: Update hello-samza
master to use Samza 0.11.0
Posted by xi...@apache.org.
SAMZA-1038: Update hello-samza master to use Samza 0.11.0
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e02c9564
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e02c9564
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e02c9564
Branch: refs/heads/master
Commit: e02c9564b7ea3d6d735d437c746021b041dfbd29
Parents: 32c9077
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Oct 18 15:00:31 2016 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Oct 18 15:00:31 2016 -0700
----------------------------------------------------------------------
gradle.properties | 2 +-
pom.xml | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e02c9564/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index fe71c66..3d8a0ea 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=0.11.0-SNAPSHOT
+SAMZA_VERSION=0.11.1-SNAPSHOT
KAFKA_VERSION=0.8.2.1
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e02c9564/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9cc44de..d90b1f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.11.0</version>
+ <version>0.11.1</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -143,7 +143,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.11.0-SNAPSHOT</samza.version>
+ <samza.version>0.11.1-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
[27/33] samza-hello-samza git commit: Support intellij in build.gradle
Posted by xi...@apache.org.
Support intellij in build.gradle
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e7811dbf
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e7811dbf
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e7811dbf
Branch: refs/heads/master
Commit: e7811dbf7f9090740f6178f4e763d796a75091e2
Parents: 48c858a
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Mon Oct 30 16:26:18 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Oct 30 16:26:18 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e7811dbf/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ec451d5..9d1f543 100644
--- a/build.gradle
+++ b/build.gradle
@@ -18,6 +18,7 @@
*/
apply plugin: 'eclipse'
+apply plugin: 'idea'
apply plugin: 'java'
defaultTasks 'distTar'
[28/33] samza-hello-samza git commit: SAMZA-1524: Azure checkpoint
and eventhubs fluent api standalone example
Posted by xi...@apache.org.
SAMZA-1524: Azure checkpoint and eventhubs fluent api standalone example
Tutorial and docs coming soon
Author: Daniel Chen <29...@users.noreply.github.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #29 from dxichen/azure-examples
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f48c7f9f
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f48c7f9f
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f48c7f9f
Branch: refs/heads/master
Commit: f48c7f9f3419e69821b9685dcffc3fd89803f8fa
Parents: e7811db
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Mon Dec 11 17:56:18 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 11 17:56:18 2017 -0800
----------------------------------------------------------------------
bin/run-azure-application.sh | 30 ++++++++++
pom.xml | 5 ++
src/main/assembly/src.xml | 4 ++
.../azure-application-local-runner.properties | 49 ++++++++++++++++
.../samza/examples/azure/AzureApplication.java | 61 ++++++++++++++++++++
.../examples/azure/AzureZKLocalApplication.java | 42 ++++++++++++++
6 files changed, 191 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/bin/run-azure-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh
new file mode 100755
index 0000000..8cd2463
--- /dev/null
+++ b/bin/run-azure-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ada4c2b..5b2eb55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
+ <artifactId>samza-azure</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index c04ace0..8f3694e 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -48,6 +48,10 @@
<source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
+ <file>
+ <source>${basedir}/bin/run-azure-application.sh</source>
+ <outputDirectory>bin</outputDirectory>
+ </file>
</files>
<dependencySets>
<dependencySet>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/config/azure-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
new file mode 100644
index 0000000..e440fd8
--- /dev/null
+++ b/src/main/config/azure-application-local-runner.properties
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=azure-application-local-runner
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=eventhubs
+job.coordinator.zk.connect=localhost:2181
+
+# Azure EventHubs System
+systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+systems.eventhubs.stream.list=output-stream,input-stream
+
+# Add your EventHubs input stream credentials here
+systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Add your EventHubs output stream credentials here
+systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Azure Table Checkpoint Manager
+task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
+azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Streams
+streams.input-stream.samza.system=eventhubs
+streams.output-stream.samza.system=eventhubs
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
new file mode 100644
index 0000000..9f565fe
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+
+public class AzureApplication implements StreamApplication {
+
+ // Inputs
+ private static final String INPUT_STREAM_ID = "input-stream";
+
+ // Outputs
+ private static final String OUTPUT_STREAM_ID = "output-stream";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ // Input
+ MessageStream<KV<String, byte[]>> eventhubInput = graph.getInputStream(INPUT_STREAM_ID);
+
+ // Output
+ OutputStream<KV<String, byte[]>> eventhubOutput =
+ graph.getOutputStream(OUTPUT_STREAM_ID, KVSerde.of(new StringSerde(), new ByteSerde()));
+
+ // Send
+ eventhubInput
+ .filter((message) -> message.getKey() != null)
+ .map((message) -> {
+ System.out.println("Sending: ");
+ System.out.println("Received Key: " + message.getKey());
+ System.out.println("Received Message: " + new String(message.getValue()));
+ return message;
+ })
+ .sendTo(eventhubOutput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
new file mode 100644
index 0000000..3d4f8b0
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import samza.examples.azure.AzureApplication;
+
+public class AzureZKLocalApplication {
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ AzureApplication app = new AzureApplication();
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+}
[03/33] samza-hello-samza git commit: SAMZA-663: change README-gradle
to markdown format
Posted by xi...@apache.org.
SAMZA-663: change README-gradle to markdown format
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c0ef56c1
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c0ef56c1
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c0ef56c1
Branch: refs/heads/master
Commit: c0ef56c1aecafce2569ad3c377a42f4926eab19f
Parents: 28d1ca1
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Apr 29 09:48:41 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Dec 21 17:22:19 2015 -0800
----------------------------------------------------------------------
README-gradle.md | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++
README-gradle.txt | 51 --------------------------------------------------
2 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c0ef56c1/README-gradle.md
----------------------------------------------------------------------
diff --git a/README-gradle.md b/README-gradle.md
new file mode 100644
index 0000000..a564427
--- /dev/null
+++ b/README-gradle.md
@@ -0,0 +1,51 @@
+
+To use gradle to build/run the hello-samza project:
+
+1) the project is configured to download and use gradle version 2.3 - on first task execution, it will download the required gradle jars.
+
+2) download/install yarn/kafka/zookeeper:
+
+ $ ./gradlew installGrid
+
+3) build hello-samza job package:
+
+ $ ./gradlew distTar
+
+4) deploy hello-samza project to grid:
+
+ $ ./gradlew deployHelloSamza
+
+5) start the grid (starts up yarn/kafka/zookeeper):
+
+ $ ./gradlew startGrid
+
+6) run the various Samza tasks that are part of hello-samza project:
+
+ $ ./gradlew runWikiFeed
+ $ ./gradlew runWikiParser
+ $ ./gradlew runWikiStats
+
+7) view all the current Kafka topics:
+
+ $ ./gradlew listKafkaTopics
+
+8) view the Kafka topics output by the various Samza tasks:
+
+ $ ./gradlew dumpWikiRaw
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+ $ ./gradlew dumpWikiEdits
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+ $ ./gradlew dumpWikiStats
+ ( output of Kafka topic scrolls by)
+ CTRL-c
+
+9) stop all the components:
+
+ $ ./gradlew stopGrid
+
+Shortcut: using the 'runWiki*' tasks directly will do steps 3-6 automatically.
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c0ef56c1/README-gradle.txt
----------------------------------------------------------------------
diff --git a/README-gradle.txt b/README-gradle.txt
deleted file mode 100644
index a564427..0000000
--- a/README-gradle.txt
+++ /dev/null
@@ -1,51 +0,0 @@
-
-To use gradle to build/run the hello-samza project:
-
-1) the project is configured to download and use gradle version 2.3 - on first task execution, it will download the required gradle jars.
-
-2) download/install yarn/kafka/zookeeper:
-
- $ ./gradlew installGrid
-
-3) build hello-samza job package:
-
- $ ./gradlew distTar
-
-4) deploy hello-samza project to grid:
-
- $ ./gradlew deployHelloSamza
-
-5) start the grid (starts up yarn/kafka/zookeeper):
-
- $ ./gradlew startGrid
-
-6) run the various Samza tasks that are part of hello-samza project:
-
- $ ./gradlew runWikiFeed
- $ ./gradlew runWikiParser
- $ ./gradlew runWikiStats
-
-7) view all the current Kafka topics:
-
- $ ./gradlew listKafkaTopics
-
-8) view the Kafka topics output by the various Samza tasks:
-
- $ ./gradlew dumpWikiRaw
- ( output of Kafka topic scrolls by)
- CTRL-c
-
- $ ./gradlew dumpWikiEdits
- ( output of Kafka topic scrolls by)
- CTRL-c
-
- $ ./gradlew dumpWikiStats
- ( output of Kafka topic scrolls by)
- CTRL-c
-
-9) stop all the components:
-
- $ ./gradlew stopGrid
-
-Shortcut: using the 'runWiki*' tasks directly will do steps 3-6 automatically.
-