You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/04 20:43:09 UTC
[samza-beam-examples] 01/01: Initial commit
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git
commit 5e96162b5aaa5a0eaf2fb8c76aa72e43abd21020
Author: xiliu <xi...@linkedin.com>
AuthorDate: Fri Jan 4 12:42:16 2019 -0800
Initial commit
Add word count example from Beam
---
.gitignore | 33 +++
pom.xml | 301 +++++++++++++++++++++
scripts/grid | 243 +++++++++++++++++
src/main/assembly/samza.xml | 52 ++++
src/main/bash/run-beam-container.sh | 34 +++
src/main/bash/run-beam-yarn.sh | 44 +++
src/main/config/word-count-standalone.properties | 30 ++
src/main/config/word-count-yarn.properties | 29 ++
.../java/org/apache/beam/examples/WordCount.java | 177 ++++++++++++
src/main/resources/log4j.xml | 52 ++++
10 files changed, 995 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f31af00
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+*.class
+*.war
+*.ear
+target/
+.classpath
+.project
+.vagrant
+.settings/
+.idea/
+.idea_modules/
+*.iml
+*.ipr
+*.iws
+*/.cache
+deploy
+*.swp
+build/
+.gradle/
+state
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fda4802
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,301 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-beam-examples</artifactId>
+ <version>0.1</version>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <beam.version>2.9.0</beam.version>
+ <samza.version>0.14.1</samza.version>
+
+ <bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
+ <google-clients.version>1.27.0</google-clients.version>
+ <guava.version>20.0</guava.version>
+ <hamcrest.version>1.3</hamcrest.version>
+ <jackson.version>2.9.5</jackson.version>
+ <joda.version>2.4</joda.version>
+ <junit.version>4.12</junit.version>
+ <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
+ <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+ <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
+ <mockito.version>1.10.19</mockito.version>
+ <pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
+ <slf4j.version>1.7.25</slf4j.version>
+ <spark.version>2.3.2</spark.version>
+ <hadoop.version>2.7.3</hadoop.version>
+ <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <parallel>all</parallel>
+ <threadCount>4</threadCount>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+
+ <!-- Ensure that the Maven jar plugin runs before the Maven
+ shade plugin by listing the plugin higher within the file. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ </plugin>
+
+ <!-- plugin to build the tar.gz file filled with examples -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/samza.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven-exec-plugin.version}</version>
+ <configuration>
+ <cleanupDaemonThreads>false</cleanupDaemonThreads>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>samza-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-samza</artifactId>
+ <version>${beam.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <!-- Adds a dependency on the Beam SDK. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-samza</artifactId>
+ <version>${beam.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-shell</artifactId>
+ <classifier>dist</classifier>
+ <type>tgz</type>
+ <version>${samza.version}</version>
+ </dependency>
+
+ <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
+ <dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ <version>${google-clients.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ <version>${bigquery.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <version>${google-clients.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ <version>${pubsub.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <!-- Add slf4j API frontend binding with JUL backend -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>${slf4j.version}</version>
+ <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- Hamcrest and JUnit are required dependencies of PAssert,
+ which is used in the main code of DebuggingWordCount example. -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>${hamcrest.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>${hamcrest.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ </dependency>
+
+ <!-- The DirectRunner is needed for unit tests. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${beam.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/scripts/grid b/scripts/grid
new file mode 100755
index 0000000..bf2266a
--- /dev/null
+++ b/scripts/grid
@@ -0,0 +1,243 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will download, setup, start, and stop servers for Kafka, YARN, and ZooKeeper,
+
+if [ -z "$JAVA_HOME" ]; then
+ if [ -x /usr/libexec/java_home ]; then
+ export JAVA_HOME="$(/usr/libexec/java_home)"
+ else
+ echo "JAVA_HOME not set. Exiting."
+ exit 1
+ fi
+fi
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR=$(dirname $DIR)
+DEPLOY_ROOT_DIR=$BASE_DIR/deploy
+DOWNLOAD_CACHE_DIR=$HOME/.grid/download
+COMMAND=$1
+SYSTEM=$2
+
+DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
+DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
+
+SERVICE_WAIT_TIMEOUT_SEC=20
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
+bootstrap() {
+ echo "Bootstrapping the system..."
+ stop_all
+ rm -rf "$DEPLOY_ROOT_DIR"
+ mkdir "$DEPLOY_ROOT_DIR"
+ install_all
+ start_all
+ exit 0
+}
+
+standalone() {
+ echo "Setting up the system..."
+ stop_all
+ rm -rf "$DEPLOY_ROOT_DIR"
+ mkdir "$DEPLOY_ROOT_DIR"
+ install_all_without_yarn
+ start_all_without_yarn
+ exit 0
+}
+
+install_all() {
+ $DIR/grid install zookeeper
+ $DIR/grid install yarn
+ $DIR/grid install kafka
+}
+
+install_all_without_yarn() {
+ $DIR/grid install zookeeper
+ $DIR/grid install kafka
+}
+
+install_zookeeper() {
+ mkdir -p "$DEPLOY_ROOT_DIR"
+ install zookeeper $DOWNLOAD_ZOOKEEPER zookeeper-3.4.3
+ cp "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo_sample.cfg" "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo.cfg"
+}
+
+install_yarn() {
+ mkdir -p "$DEPLOY_ROOT_DIR"
+ install yarn $DOWNLOAD_YARN hadoop-2.6.1
+ cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml"
+ if [ ! -f "$HOME/.grid/conf/yarn-site.xml" ]; then
+ mkdir -p "$HOME/.grid/conf"
+ cp "$BASE_DIR/conf/yarn-site.xml" "$HOME/.grid/conf/yarn-site.xml"
+ fi
+}
+
+install_kafka() {
+ mkdir -p "$DEPLOY_ROOT_DIR"
+ install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+ # have to use SIGTERM since nohup on appears to ignore SIGINT
+ # and Kafka switched to SIGINT in KAFKA-1031.
+ sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
+ # in order to simplify the wikipedia-stats example job, set topic to have just 1 partition by default
+ sed -i.bak 's/^num\.partitions *=.*/num.partitions=1/' $DEPLOY_ROOT_DIR/kafka/config/server.properties
+}
+
+install() {
+ DESTINATION_DIR="$DEPLOY_ROOT_DIR/$1"
+ DOWNLOAD_URL=$2
+ PACKAGE_DIR="$DOWNLOAD_CACHE_DIR/$3"
+ PACKAGE_FILE="$DOWNLOAD_CACHE_DIR/$(basename $DOWNLOAD_URL)"
+ if [ -f "$PACKAGE_FILE" ]; then
+ echo "Using previously downloaded file $PACKAGE_FILE"
+ else
+ echo "Downloading $(basename $DOWNLOAD_URL)..."
+ mkdir -p $DOWNLOAD_CACHE_DIR
+ curl "$DOWNLOAD_URL" > "${PACKAGE_FILE}.tmp"
+ mv "${PACKAGE_FILE}.tmp" "$PACKAGE_FILE"
+ fi
+ rm -rf "$DESTINATION_DIR" "$PACKAGE_DIR"
+ tar -xf "$PACKAGE_FILE" -C $DOWNLOAD_CACHE_DIR
+ mv "$PACKAGE_DIR" "$DESTINATION_DIR"
+}
+
+start_all() {
+ $DIR/grid start zookeeper
+ $DIR/grid start yarn
+ $DIR/grid start kafka
+}
+
+start_all_without_yarn() {
+ $DIR/grid start zookeeper
+ $DIR/grid start kafka
+}
+
+start_zookeeper() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+ cd $DEPLOY_ROOT_DIR/$SYSTEM
+ bin/zkServer.sh start
+ wait_for_service "zookeeper" $ZOOKEEPER_PORT
+ cd - > /dev/null
+ else
+ echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+ fi
+}
+
+start_yarn() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+ $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+ wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
+ $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+ wait_for_service "nodemanager" $NODEMANAGER_PORT
+ else
+ echo 'YARN is not installed. Run: bin/grid install yarn'
+ fi
+}
+
+start_kafka() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-start.sh ]; then
+ mkdir -p $DEPLOY_ROOT_DIR/$SYSTEM/logs
+ cd $DEPLOY_ROOT_DIR/$SYSTEM
+ nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
+ cd - > /dev/null
+ wait_for_service "kafka" $KAFKA_PORT
+ else
+ echo 'Kafka is not installed. Run: bin/grid install kafka'
+ fi
+}
+
+wait_for_service() {
+ local SERVICE_NAME=$1
+ local PORT=$2
+ echo "Waiting for $SERVICE_NAME to start..."
+ local CURRENT_WAIT_TIME=0
+ until $(nc -w 1 localhost $PORT); do
+ printf '.'
+ sleep 1
+ if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+ printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+ exit 1
+ fi
+ done
+ printf '\n'
+ echo "$SERVICE_NAME has started";
+}
+
+stop_all() {
+ $DIR/grid stop kafka
+ $DIR/grid stop yarn
+ $DIR/grid stop zookeeper
+}
+
+stop_zookeeper() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+ cd $DEPLOY_ROOT_DIR/$SYSTEM
+ bin/zkServer.sh stop
+ cd - > /dev/null
+ else
+ echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+ fi
+}
+
+stop_yarn() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+ $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop resourcemanager
+ $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop nodemanager
+ else
+ echo 'YARN is not installed. Run: bin/grid install yarn'
+ fi
+}
+
+stop_kafka() {
+ if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-stop.sh ]; then
+ cd $DEPLOY_ROOT_DIR/$SYSTEM
+ bin/kafka-server-stop.sh || true # tolerate nonzero exit status if Kafka isn't running
+ cd - > /dev/null
+ else
+ echo 'Kafka is not installed. Run: bin/grid install kafka'
+ fi
+}
+
+# Check arguments
+if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
+ bootstrap
+ exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+ standalone
+ exit 0
+elif (test -z "$COMMAND" && test -z "$SYSTEM") \
+ || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
+ echo
+ echo " Usage.."
+ echo
+ echo " $ grid"
+ echo " $ grid bootstrap"
+ echo " $ grid standalone"
+ echo " $ grid install [yarn|kafka|zookeeper|all]"
+ echo " $ grid start [yarn|kafka|zookeeper|all]"
+ echo " $ grid stop [yarn|kafka|zookeeper|all]"
+ echo
+ exit 1
+else
+ echo "EXECUTING: $COMMAND $SYSTEM"
+
+ "$COMMAND"_"$SYSTEM"
+fi
diff --git a/src/main/assembly/samza.xml b/src/main/assembly/samza.xml
new file mode 100644
index 0000000..4cfc895
--- /dev/null
+++ b/src/main/assembly/samza.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<assembly xmlns="http://maven.apache.org/plugins/assembly/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>dist</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <files>
+ <file>
+ <source>${basedir}/src/main/resources/log4j.xml</source>
+ <outputDirectory>lib</outputDirectory>
+ </file>
+ <file>
+ <source>${basedir}/src/main/bash/run-beam-app.sh</source>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ <file>
+ <source>${basedir}/src/main/bash/run-beam-container.sh</source>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ </files>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+ </includes>
+ <fileMode>0744</fileMode>
+ <unpack>true</unpack>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>beam-runners-samza</include>
+ </includes>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
diff --git a/src/main/bash/run-beam-container.sh b/src/main/bash/run-beam-container.sh
new file mode 100755
index 0000000..9d70d5a
--- /dev/null
+++ b/src/main/bash/run-beam-container.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if server is set. If not - set server optimization
+[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
+
+# Set container ID system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.id* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.id=$SAMZA_CONTAINER_ID"
+
+# Set container name system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.name* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-container-$SAMZA_CONTAINER_ID"
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+exec $(dirname $0)/run-class.sh $1 --config-factory=org.apache.beam.runners.samza.container.ContainerCfgFactory --config-path=none --config app.runner.class=org.apache.beam.runners.samza.container.BeamContainerRunner "$@"
\ No newline at end of file
diff --git a/src/main/bash/run-beam-yarn.sh b/src/main/bash/run-beam-yarn.sh
new file mode 100755
index 0000000..3003e5e
--- /dev/null
+++ b/src/main/bash/run-beam-yarn.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+op=$(if [[ "$@" =~ (--operation=)([^ ]*) ]]; then echo "${BASH_REMATCH[2]}"; else echo "run"; fi)
+
+case $op in
+ run)
+ exec $(dirname $0)/run-class.sh $1 --config task.execute="bin/run-beam-container.sh $1" "$@"
+ ;;
+
+ kill)
+ exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+ ;;
+
+ status)
+ exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+ ;;
+esac
\ No newline at end of file
diff --git a/src/main/config/word-count-standalone.properties b/src/main/config/word-count-standalone.properties
new file mode 100644
index 0000000..4c492d5
--- /dev/null
+++ b/src/main/config/word-count-standalone.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# runner
+app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
+
+# zk
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.coordinator.zk.connect=localhost:2181
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/config/word-count-yarn.properties b/src/main/config/word-count-yarn.properties
new file mode 100644
index 0000000..36d2fcf
--- /dev/null
+++ b/src/main/config/word-count-yarn.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# YARN package path
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
new file mode 100644
index 0000000..11e5816
--- /dev/null
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * An example that counts words and includes Beam best practices.
+ *
+ * <p>For a detailed walkthrough of this example, see <a
+ * href="https://beam.apache.org/get-started/wordcount-example/">
+ * https://beam.apache.org/get-started/wordcount-example/ </a>
+ *
+ * <p>Basic concepts: Reading text files; counting a
+ * PCollection; writing to text files
+ *
+ * <p>New Concepts:
+ *
+ * <pre>
+ * 1. Executing a Pipeline both locally and using the selected runner
+ * 2. Using ParDo with static DoFns defined out-of-line
+ * 3. Building a composite transform
+ * 4. Defining your own pipeline options
+ * </pre>
+ *
+ * <p>To execute this pipeline with SamzaRunner:
+ *
+ * <pre>{@code
+ * $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
+ * -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
+ * }</pre>
+ *
+ */
+public class WordCount {
+ private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+
+ /**
+ * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+ * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
+ * a ParDo in the pipeline.
+ */
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+ private final Distribution lineLenDist =
+ Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<String> receiver) {
+ lineLenDist.update(element.length());
+ if (element.trim().isEmpty()) {
+ emptyLines.inc();
+ }
+
+ // Split the line into words.
+ String[] words = element.split(TOKENIZER_PATTERN, -1);
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(word);
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a Word and Count into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+ @Override
+ public String apply(KV<String, Long> input) {
+ return input.getKey() + ": " + input.getValue();
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into a PCollection of
+ * formatted word counts.
+ *
+ * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+ * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+ * modular testing, and an improved monitoring experience.
+ */
+ public static class CountWords
+ extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
+
+ // Convert lines of text into individual words.
+ PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
+
+ return wordCounts;
+ }
+ }
+
+ /**
+ * Options supported by {@link WordCount}.
+ *
+ * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments to
+ * be processed by the command-line parser, and specify default values for them. You can then
+ * access the options values in your pipeline code.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ public interface WordCountOptions extends PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the text of King Lear. Set
+ * this option to choose a different input file or glob.
+ */
+ @Description("Path of the file to read from")
+ @Required
+ String getInputFile();
+
+ void setInputFile(String value);
+
+ /** Set this required option to specify where to write the output. */
+ @Description("Path of the file to write to")
+ @Required
+ String getOutput();
+
+ void setOutput(String value);
+ }
+
+ static void runWordCount(WordCountOptions options) {
+ Pipeline p = Pipeline.create(options);
+
+ // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+ // static FormatAsTextFn() to the ParDo transform.
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+ .apply(new CountWords())
+ .apply(MapElements.via(new FormatAsTextFn()))
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
+
+ p.run().waitUntilFinish();
+ }
+
+ public static void main(String[] args) {
+ WordCountOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
+ options.setJobName("word-count");
+
+ runWordCount(options);
+ }
+}
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..805d5ca
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+ <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+ <param name="MaxFileSize" value="256MB" />
+ <param name="MaxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
+ <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
+ <param name="MaxFileSize" value="256MB" />
+ <param name="MaxBackupIndex" value="1" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
+ <logger name="STARTUP_LOGGER" additivity="false">
+ <level value="info" />
+ <appender-ref ref="StartupAppender"/>
+ </logger>
+ <root>
+ <priority value="info" />
+ <appender-ref ref="RollingAppender"/>
+ <appender-ref ref="jmx" />
+ </root>
+</log4j:configuration>