You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/04 20:43:09 UTC

[samza-beam-examples] 01/01: Initial commit

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git

commit 5e96162b5aaa5a0eaf2fb8c76aa72e43abd21020
Author: xiliu <xi...@linkedin.com>
AuthorDate: Fri Jan 4 12:42:16 2019 -0800

    Initial commit
    
    Add word count example from Beam
---
 .gitignore                                         |  33 +++
 pom.xml                                            | 301 +++++++++++++++++++++
 scripts/grid                                       | 243 +++++++++++++++++
 src/main/assembly/samza.xml                        |  52 ++++
 src/main/bash/run-beam-container.sh                |  34 +++
 src/main/bash/run-beam-yarn.sh                     |  44 +++
 src/main/config/word-count-standalone.properties   |  30 ++
 src/main/config/word-count-yarn.properties         |  29 ++
 .../java/org/apache/beam/examples/WordCount.java   | 177 ++++++++++++
 src/main/resources/log4j.xml                       |  52 ++++
 10 files changed, 995 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f31af00
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+*.class
+*.war
+*.ear
+target/
+.classpath
+.project
+.vagrant
+.settings/
+.idea/
+.idea_modules/
+*.iml
+*.ipr
+*.iws
+*/.cache
+deploy
+*.swp
+build/
+.gradle/
+state
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fda4802
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,301 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.samza</groupId>
+  <artifactId>samza-beam-examples</artifactId>
+  <version>0.1</version>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <beam.version>2.9.0</beam.version>
+    <samza.version>0.14.1</samza.version>
+
+    <bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
+    <google-clients.version>1.27.0</google-clients.version>
+    <guava.version>20.0</guava.version>
+    <hamcrest.version>1.3</hamcrest.version>
+    <jackson.version>2.9.5</jackson.version>
+    <joda.version>2.4</joda.version>
+    <junit.version>4.12</junit.version>
+    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
+    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
+    <mockito.version>1.10.19</mockito.version>
+    <pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
+    <slf4j.version>1.7.25</slf4j.version>
+    <spark.version>2.3.2</spark.version>
+    <hadoop.version>2.7.3</hadoop.version>
+    <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <parallel>all</parallel>
+          <threadCount>4</threadCount>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.maven.surefire</groupId>
+            <artifactId>surefire-junit47</artifactId>
+            <version>${maven-surefire-plugin.version}</version>
+          </dependency>
+        </dependencies>
+      </plugin>
+
+      <!-- Ensure that the Maven jar plugin runs before the Maven
+        shade plugin by listing the plugin higher within the file. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${maven-jar-plugin.version}</version>
+      </plugin>
+
+      <!-- plugin to build the tar.gz file filled with examples -->
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.3</version>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/samza.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>exec-maven-plugin</artifactId>
+          <version>${maven-exec-plugin.version}</version>
+          <configuration>
+            <cleanupDaemonThreads>false</cleanupDaemonThreads>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>samza-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-samza</artifactId>
+          <version>${beam.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <dependencies>
+    <!-- Adds a dependency on the Beam SDK. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-samza</artifactId>
+      <version>${beam.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-shell</artifactId>
+      <classifier>dist</classifier>
+      <type>tgz</type>
+      <version>${samza.version}</version>
+    </dependency>
+
+    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+      <version>${bigquery.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+      <version>${pubsub.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <!-- Add slf4j API frontend binding with JUL backend -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+      <scope>runtime</scope>
+    </dependency>
+
+    <!-- Hamcrest and JUnit are required dependencies of PAssert,
+         which is used in the main code of DebuggingWordCount example. -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>${hamcrest.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <version>${hamcrest.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+    </dependency>
+
+    <!-- The DirectRunner is needed for unit tests. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${beam.version}</version>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/scripts/grid b/scripts/grid
new file mode 100755
index 0000000..bf2266a
--- /dev/null
+++ b/scripts/grid
@@ -0,0 +1,243 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will download, setup, start, and stop servers for Kafka, YARN, and ZooKeeper,
+
+if [ -z "$JAVA_HOME" ]; then
+  if [ -x /usr/libexec/java_home ]; then
+    export JAVA_HOME="$(/usr/libexec/java_home)"
+  else
+    echo "JAVA_HOME not set. Exiting."
+    exit 1
+  fi
+fi
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR=$(dirname $DIR)
+DEPLOY_ROOT_DIR=$BASE_DIR/deploy
+DOWNLOAD_CACHE_DIR=$HOME/.grid/download
+COMMAND=$1
+SYSTEM=$2
+
+DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
+DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
+
+SERVICE_WAIT_TIMEOUT_SEC=20
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
+bootstrap() {
+  echo "Bootstrapping the system..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all
+  start_all
+  exit 0
+}
+
+standalone() {
+  echo "Setting up the system..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all_without_yarn
+  start_all_without_yarn
+  exit 0
+}
+
+install_all() {
+  $DIR/grid install zookeeper
+  $DIR/grid install yarn
+  $DIR/grid install kafka
+}
+
+install_all_without_yarn() {
+  $DIR/grid install zookeeper
+  $DIR/grid install kafka
+}
+
+install_zookeeper() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install zookeeper $DOWNLOAD_ZOOKEEPER zookeeper-3.4.3
+  cp "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo_sample.cfg" "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo.cfg"
+}
+
+install_yarn() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install yarn $DOWNLOAD_YARN hadoop-2.6.1
+  cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml"
+  if [ ! -f "$HOME/.grid/conf/yarn-site.xml" ]; then
+    mkdir -p "$HOME/.grid/conf"
+    cp "$BASE_DIR/conf/yarn-site.xml" "$HOME/.grid/conf/yarn-site.xml"
+  fi
+}
+
+install_kafka() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+  # have to use SIGTERM since nohup on appears to ignore SIGINT
+  # and Kafka switched to SIGINT in KAFKA-1031.
+  sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
+  # in order to simplify the wikipedia-stats example job, set topic to have just 1 partition by default
+  sed -i.bak 's/^num\.partitions *=.*/num.partitions=1/' $DEPLOY_ROOT_DIR/kafka/config/server.properties
+}
+
+install() {
+  DESTINATION_DIR="$DEPLOY_ROOT_DIR/$1"
+  DOWNLOAD_URL=$2
+  PACKAGE_DIR="$DOWNLOAD_CACHE_DIR/$3"
+  PACKAGE_FILE="$DOWNLOAD_CACHE_DIR/$(basename $DOWNLOAD_URL)"
+  if [ -f "$PACKAGE_FILE" ]; then
+    echo "Using previously downloaded file $PACKAGE_FILE"
+  else
+    echo "Downloading $(basename $DOWNLOAD_URL)..."
+    mkdir -p $DOWNLOAD_CACHE_DIR
+    curl "$DOWNLOAD_URL" > "${PACKAGE_FILE}.tmp"
+    mv "${PACKAGE_FILE}.tmp" "$PACKAGE_FILE"
+  fi
+  rm -rf "$DESTINATION_DIR" "$PACKAGE_DIR"
+  tar -xf "$PACKAGE_FILE" -C $DOWNLOAD_CACHE_DIR
+  mv "$PACKAGE_DIR" "$DESTINATION_DIR"
+}
+
+start_all() {
+  $DIR/grid start zookeeper
+  $DIR/grid start yarn
+  $DIR/grid start kafka
+}
+
+start_all_without_yarn() {
+  $DIR/grid start zookeeper
+  $DIR/grid start kafka
+}
+
+start_zookeeper() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/zkServer.sh start
+    wait_for_service "zookeeper" $ZOOKEEPER_PORT
+    cd - > /dev/null
+  else
+    echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+  fi
+}
+
+start_yarn() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+    wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+    wait_for_service "nodemanager" $NODEMANAGER_PORT
+  else
+    echo 'YARN is not installed. Run: bin/grid install yarn'
+  fi
+}
+
+start_kafka() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-start.sh ]; then
+    mkdir -p $DEPLOY_ROOT_DIR/$SYSTEM/logs
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
+    cd - > /dev/null
+    wait_for_service "kafka" $KAFKA_PORT
+  else
+    echo 'Kafka is not installed. Run: bin/grid install kafka'
+  fi
+}
+
+wait_for_service() {
+  local SERVICE_NAME=$1
+  local PORT=$2
+  echo "Waiting for $SERVICE_NAME to start..."
+  local CURRENT_WAIT_TIME=0
+  until $(nc -w 1 localhost $PORT); do
+    printf '.'
+    sleep 1
+    if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+      printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+      exit 1
+    fi
+  done
+  printf '\n'
+  echo "$SERVICE_NAME has started";
+}
+
+stop_all() {
+  $DIR/grid stop kafka
+  $DIR/grid stop yarn
+  $DIR/grid stop zookeeper
+}
+
+stop_zookeeper() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/zkServer.sh stop
+    cd - > /dev/null
+  else
+    echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+  fi
+}
+
+stop_yarn() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop resourcemanager
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop nodemanager
+  else
+    echo 'YARN is not installed. Run: bin/grid install yarn'
+  fi
+}
+
+stop_kafka() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-stop.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/kafka-server-stop.sh || true # tolerate nonzero exit status if Kafka isn't running
+    cd - > /dev/null
+  else
+    echo 'Kafka is not installed. Run: bin/grid install kafka'
+  fi
+}
+
+# Check arguments
+if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
+  bootstrap
+  exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+  standalone
+  exit 0
+elif (test -z "$COMMAND" && test -z "$SYSTEM") \
+  || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
+  echo
+  echo "  Usage.."
+  echo
+  echo "  $ grid"
+  echo "  $ grid bootstrap"
+  echo "  $ grid standalone"
+  echo "  $ grid install [yarn|kafka|zookeeper|all]"
+  echo "  $ grid start [yarn|kafka|zookeeper|all]"
+  echo "  $ grid stop [yarn|kafka|zookeeper|all]"
+  echo
+  exit 1
+else
+  echo "EXECUTING: $COMMAND $SYSTEM"
+
+  "$COMMAND"_"$SYSTEM"
+fi
diff --git a/src/main/assembly/samza.xml b/src/main/assembly/samza.xml
new file mode 100644
index 0000000..4cfc895
--- /dev/null
+++ b/src/main/assembly/samza.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<assembly xmlns="http://maven.apache.org/plugins/assembly/2.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+    <id>dist</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <files>
+        <file>
+            <source>${basedir}/src/main/resources/log4j.xml</source>
+            <outputDirectory>lib</outputDirectory>
+        </file>
+        <file>
+            <source>${basedir}/src/main/bash/run-beam-app.sh</source>
+            <outputDirectory>bin</outputDirectory>
+        </file>
+        <file>
+            <source>${basedir}/src/main/bash/run-beam-container.sh</source>
+            <outputDirectory>bin</outputDirectory>
+        </file>
+    </files>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>bin</outputDirectory>
+            <includes>
+                <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+            </includes>
+            <fileMode>0744</fileMode>
+            <unpack>true</unpack>
+        </dependencySet>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>beam-runners-samza</include>
+            </includes>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git a/src/main/bash/run-beam-container.sh b/src/main/bash/run-beam-container.sh
new file mode 100755
index 0000000..9d70d5a
--- /dev/null
+++ b/src/main/bash/run-beam-container.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if server is set. If not - set server optimization
+[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
+
+# Set container ID system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.id* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.id=$SAMZA_CONTAINER_ID"
+
+# Set container name system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.name* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-container-$SAMZA_CONTAINER_ID"
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+exec $(dirname $0)/run-class.sh $1 --config-factory=org.apache.beam.runners.samza.container.ContainerCfgFactory --config-path=none --config app.runner.class=org.apache.beam.runners.samza.container.BeamContainerRunner "$@"
\ No newline at end of file
diff --git a/src/main/bash/run-beam-yarn.sh b/src/main/bash/run-beam-yarn.sh
new file mode 100755
index 0000000..3003e5e
--- /dev/null
+++ b/src/main/bash/run-beam-yarn.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+op=$(if [[ "$@" =~ (--operation=)([^ ]*) ]]; then echo "${BASH_REMATCH[2]}"; else echo "run"; fi)
+
+case $op in
+  run)
+    exec $(dirname $0)/run-class.sh $1 --config task.execute="bin/run-beam-container.sh $1" "$@"
+  ;;
+
+  kill)
+    exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+  ;;
+
+  status)
+    exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+  ;;
+esac
\ No newline at end of file
diff --git a/src/main/config/word-count-standalone.properties b/src/main/config/word-count-standalone.properties
new file mode 100644
index 0000000..4c492d5
--- /dev/null
+++ b/src/main/config/word-count-standalone.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# runner
+app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
+
+# zk
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.coordinator.zk.connect=localhost:2181
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/config/word-count-yarn.properties b/src/main/config/word-count-yarn.properties
new file mode 100644
index 0000000..36d2fcf
--- /dev/null
+++ b/src/main/config/word-count-yarn.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# YARN package path
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
new file mode 100644
index 0000000..11e5816
--- /dev/null
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * An example that counts words and includes Beam best practices.
+ *
+ * <p>For a detailed walkthrough of this example, see <a
+ * href="https://beam.apache.org/get-started/wordcount-example/">
+ * https://beam.apache.org/get-started/wordcount-example/ </a>
+ *
+ * <p>Basic concepts: Reading text files; counting a
+ * PCollection; writing to text files
+ *
+ * <p>New Concepts:
+ *
+ * <pre>
+ *   1. Executing a Pipeline both locally and using the selected runner
+ *   2. Using ParDo with static DoFns defined out-of-line
+ *   3. Building a composite transform
+ *   4. Defining your own pipeline options
+ * </pre>
+ *
+ * <p>To execute this pipeline with SamzaRunner:
+ *
+ * <pre>{@code
+ * $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
+ *      -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
+ * }</pre>
+ *
+ */
+public class WordCount {
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+
+  /**
+   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
+   * a ParDo in the pipeline.
+   */
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+    private final Distribution lineLenDist =
+        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> receiver) {
+      lineLenDist.update(element.length());
+      if (element.trim().isEmpty()) {
+        emptyLines.inc();
+      }
+
+      // Split the line into words.
+      String[] words = element.split(TOKENIZER_PATTERN, -1);
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(word);
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * A PTransform that converts a PCollection containing lines of text into a PCollection of
+   * formatted word counts.
+   *
+   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+   * modular testing, and an improved monitoring experience.
+   */
+  public static class CountWords
+      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
+
+      return wordCounts;
+    }
+  }
+
+  /**
+   * Options supported by {@link WordCount}.
+   *
+   * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments to
+   * be processed by the command-line parser, and specify default values for them. You can then
+   * access the options values in your pipeline code.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the text of King Lear. Set
+     * this option to choose a different input file or glob.
+     */
+    @Description("Path of the file to read from")
+    @Required
+    String getInputFile();
+
+    void setInputFile(String value);
+
+    /** Set this required option to specify where to write the output. */
+    @Description("Path of the file to write to")
+    @Required
+    String getOutput();
+
+    void setOutput(String value);
+  }
+
+  static void runWordCount(WordCountOptions options) {
+    Pipeline p = Pipeline.create(options);
+
+    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+    // static FormatAsTextFn() to the ParDo transform.
+    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+        .apply(new CountWords())
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
+
+    p.run().waitUntilFinish();
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
+    options.setJobName("word-count");
+
+    runWordCount(options);
+  }
+}
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..805d5ca
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="20" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="1" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <logger name="STARTUP_LOGGER" additivity="false">
+    <level value="info" />
+    <appender-ref ref="StartupAppender"/>
+  </logger>
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+    <appender-ref ref="jmx" />
+  </root>
+</log4j:configuration>