You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/04 20:43:08 UTC

[samza-beam-examples] branch master created (now 5e96162)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git.


      at 5e96162  Initial commit

This branch includes the following new commits:

     new 5e96162  Initial commit

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[samza-beam-examples] 01/01: Initial commit

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git

commit 5e96162b5aaa5a0eaf2fb8c76aa72e43abd21020
Author: xiliu <xi...@linkedin.com>
AuthorDate: Fri Jan 4 12:42:16 2019 -0800

    Initial commit
    
    Add word count example from Beam
---
 .gitignore                                         |  33 +++
 pom.xml                                            | 301 +++++++++++++++++++++
 scripts/grid                                       | 243 +++++++++++++++++
 src/main/assembly/samza.xml                        |  52 ++++
 src/main/bash/run-beam-container.sh                |  34 +++
 src/main/bash/run-beam-yarn.sh                     |  44 +++
 src/main/config/word-count-standalone.properties   |  30 ++
 src/main/config/word-count-yarn.properties         |  29 ++
 .../java/org/apache/beam/examples/WordCount.java   | 177 ++++++++++++
 src/main/resources/log4j.xml                       |  52 ++++
 10 files changed, 995 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f31af00
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+*.class
+*.war
+*.ear
+target/
+.classpath
+.project
+.vagrant
+.settings/
+.idea/
+.idea_modules/
+*.iml
+*.ipr
+*.iws
+*/.cache
+deploy
+*.swp
+build/
+.gradle/
+state
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fda4802
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,301 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.samza</groupId>
+  <artifactId>samza-beam-examples</artifactId>
+  <version>0.1</version>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <beam.version>2.9.0</beam.version>
+    <samza.version>0.14.1</samza.version>
+
+    <bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
+    <google-clients.version>1.27.0</google-clients.version>
+    <guava.version>20.0</guava.version>
+    <hamcrest.version>1.3</hamcrest.version>
+    <jackson.version>2.9.5</jackson.version>
+    <joda.version>2.4</joda.version>
+    <junit.version>4.12</junit.version>
+    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
+    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
+    <mockito.version>1.10.19</mockito.version>
+    <pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
+    <slf4j.version>1.7.25</slf4j.version>
+    <spark.version>2.3.2</spark.version>
+    <hadoop.version>2.7.3</hadoop.version>
+    <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <parallel>all</parallel>
+          <threadCount>4</threadCount>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.maven.surefire</groupId>
+            <artifactId>surefire-junit47</artifactId>
+            <version>${maven-surefire-plugin.version}</version>
+          </dependency>
+        </dependencies>
+      </plugin>
+
+      <!-- Ensure that the Maven jar plugin runs before the Maven
+        shade plugin by listing the plugin higher within the file. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${maven-jar-plugin.version}</version>
+      </plugin>
+
+      <!-- plugin to build the tar.gz file filled with examples -->
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.3</version>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/samza.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>exec-maven-plugin</artifactId>
+          <version>${maven-exec-plugin.version}</version>
+          <configuration>
+            <cleanupDaemonThreads>false</cleanupDaemonThreads>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>samza-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-samza</artifactId>
+          <version>${beam.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <dependencies>
+    <!-- Adds a dependency on the Beam SDK. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-samza</artifactId>
+      <version>${beam.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-shell</artifactId>
+      <classifier>dist</classifier>
+      <type>tgz</type>
+      <version>${samza.version}</version>
+    </dependency>
+
+    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+      <version>${bigquery.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+      <version>${pubsub.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <!-- Add slf4j API frontend binding with JUL backend -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+      <scope>runtime</scope>
+    </dependency>
+
+    <!-- Hamcrest and JUnit are required dependencies of PAssert,
+         which is used in the main code of DebuggingWordCount example. -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>${hamcrest.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <version>${hamcrest.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+    </dependency>
+
+    <!-- The DirectRunner is needed for unit tests. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${beam.version}</version>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/scripts/grid b/scripts/grid
new file mode 100755
index 0000000..bf2266a
--- /dev/null
+++ b/scripts/grid
@@ -0,0 +1,243 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will download, setup, start, and stop servers for Kafka, YARN, and ZooKeeper,
+
+if [ -z "$JAVA_HOME" ]; then
+  if [ -x /usr/libexec/java_home ]; then
+    export JAVA_HOME="$(/usr/libexec/java_home)"
+  else
+    echo "JAVA_HOME not set. Exiting."
+    exit 1
+  fi
+fi
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR=$(dirname $DIR)
+DEPLOY_ROOT_DIR=$BASE_DIR/deploy
+DOWNLOAD_CACHE_DIR=$HOME/.grid/download
+COMMAND=$1
+SYSTEM=$2
+
+DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
+DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
+
+SERVICE_WAIT_TIMEOUT_SEC=20
+ZOOKEEPER_PORT=2181
+RESOURCEMANAGER_PORT=8032
+NODEMANAGER_PORT=8042
+KAFKA_PORT=9092
+
+bootstrap() {
+  echo "Bootstrapping the system..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all
+  start_all
+  exit 0
+}
+
+standalone() {
+  echo "Setting up the system..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all_without_yarn
+  start_all_without_yarn
+  exit 0
+}
+
+install_all() {
+  $DIR/grid install zookeeper
+  $DIR/grid install yarn
+  $DIR/grid install kafka
+}
+
+install_all_without_yarn() {
+  $DIR/grid install zookeeper
+  $DIR/grid install kafka
+}
+
+install_zookeeper() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install zookeeper $DOWNLOAD_ZOOKEEPER zookeeper-3.4.3
+  cp "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo_sample.cfg" "$DEPLOY_ROOT_DIR/zookeeper/conf/zoo.cfg"
+}
+
+install_yarn() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install yarn $DOWNLOAD_YARN hadoop-2.6.1
+  cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml"
+  if [ ! -f "$HOME/.grid/conf/yarn-site.xml" ]; then
+    mkdir -p "$HOME/.grid/conf"
+    cp "$BASE_DIR/conf/yarn-site.xml" "$HOME/.grid/conf/yarn-site.xml"
+  fi
+}
+
+install_kafka() {
+  mkdir -p "$DEPLOY_ROOT_DIR"
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+  # have to use SIGTERM since nohup on appears to ignore SIGINT
+  # and Kafka switched to SIGINT in KAFKA-1031.
+  sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
+  # in order to simplify the wikipedia-stats example job, set topic to have just 1 partition by default
+  sed -i.bak 's/^num\.partitions *=.*/num.partitions=1/' $DEPLOY_ROOT_DIR/kafka/config/server.properties
+}
+
+install() {
+  DESTINATION_DIR="$DEPLOY_ROOT_DIR/$1"
+  DOWNLOAD_URL=$2
+  PACKAGE_DIR="$DOWNLOAD_CACHE_DIR/$3"
+  PACKAGE_FILE="$DOWNLOAD_CACHE_DIR/$(basename $DOWNLOAD_URL)"
+  if [ -f "$PACKAGE_FILE" ]; then
+    echo "Using previously downloaded file $PACKAGE_FILE"
+  else
+    echo "Downloading $(basename $DOWNLOAD_URL)..."
+    mkdir -p $DOWNLOAD_CACHE_DIR
+    curl "$DOWNLOAD_URL" > "${PACKAGE_FILE}.tmp"
+    mv "${PACKAGE_FILE}.tmp" "$PACKAGE_FILE"
+  fi
+  rm -rf "$DESTINATION_DIR" "$PACKAGE_DIR"
+  tar -xf "$PACKAGE_FILE" -C $DOWNLOAD_CACHE_DIR
+  mv "$PACKAGE_DIR" "$DESTINATION_DIR"
+}
+
+start_all() {
+  $DIR/grid start zookeeper
+  $DIR/grid start yarn
+  $DIR/grid start kafka
+}
+
+start_all_without_yarn() {
+  $DIR/grid start zookeeper
+  $DIR/grid start kafka
+}
+
+start_zookeeper() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/zkServer.sh start
+    wait_for_service "zookeeper" $ZOOKEEPER_PORT
+    cd - > /dev/null
+  else
+    echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+  fi
+}
+
+start_yarn() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start resourcemanager
+    wait_for_service "resourcemanager" $RESOURCEMANAGER_PORT
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh start nodemanager
+    wait_for_service "nodemanager" $NODEMANAGER_PORT
+  else
+    echo 'YARN is not installed. Run: bin/grid install yarn'
+  fi
+}
+
+start_kafka() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-start.sh ]; then
+    mkdir -p $DEPLOY_ROOT_DIR/$SYSTEM/logs
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
+    cd - > /dev/null
+    wait_for_service "kafka" $KAFKA_PORT
+  else
+    echo 'Kafka is not installed. Run: bin/grid install kafka'
+  fi
+}
+
+wait_for_service() {
+  local SERVICE_NAME=$1
+  local PORT=$2
+  echo "Waiting for $SERVICE_NAME to start..."
+  local CURRENT_WAIT_TIME=0
+  until $(nc -w 1 localhost $PORT); do
+    printf '.'
+    sleep 1
+    if [ $((++CURRENT_WAIT_TIME)) -eq $SERVICE_WAIT_TIMEOUT_SEC ]; then
+      printf "\nError: timed out while waiting for $SERVICE_NAME to start.\n"
+      exit 1
+    fi
+  done
+  printf '\n'
+  echo "$SERVICE_NAME has started";
+}
+
+stop_all() {
+  $DIR/grid stop kafka
+  $DIR/grid stop yarn
+  $DIR/grid stop zookeeper
+}
+
+stop_zookeeper() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/zkServer.sh stop
+    cd - > /dev/null
+  else
+    echo 'Zookeeper is not installed. Run: bin/grid install zookeeper'
+  fi
+}
+
+stop_yarn() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh ]; then
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop resourcemanager
+    $DEPLOY_ROOT_DIR/$SYSTEM/sbin/yarn-daemon.sh stop nodemanager
+  else
+    echo 'YARN is not installed. Run: bin/grid install yarn'
+  fi
+}
+
+stop_kafka() {
+  if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kafka-server-stop.sh ]; then
+    cd $DEPLOY_ROOT_DIR/$SYSTEM
+    bin/kafka-server-stop.sh || true # tolerate nonzero exit status if Kafka isn't running
+    cd - > /dev/null
+  else
+    echo 'Kafka is not installed. Run: bin/grid install kafka'
+  fi
+}
+
+# Check arguments
+if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
+  bootstrap
+  exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+  standalone
+  exit 0
+elif (test -z "$COMMAND" && test -z "$SYSTEM") \
+  || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
+  echo
+  echo "  Usage.."
+  echo
+  echo "  $ grid"
+  echo "  $ grid bootstrap"
+  echo "  $ grid standalone"
+  echo "  $ grid install [yarn|kafka|zookeeper|all]"
+  echo "  $ grid start [yarn|kafka|zookeeper|all]"
+  echo "  $ grid stop [yarn|kafka|zookeeper|all]"
+  echo
+  exit 1
+else
+  echo "EXECUTING: $COMMAND $SYSTEM"
+
+  "$COMMAND"_"$SYSTEM"
+fi
diff --git a/src/main/assembly/samza.xml b/src/main/assembly/samza.xml
new file mode 100644
index 0000000..4cfc895
--- /dev/null
+++ b/src/main/assembly/samza.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<assembly xmlns="http://maven.apache.org/plugins/assembly/2.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+    <id>dist</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <files>
+        <file>
+            <source>${basedir}/src/main/resources/log4j.xml</source>
+            <outputDirectory>lib</outputDirectory>
+        </file>
+        <file>
+            <source>${basedir}/src/main/bash/run-beam-app.sh</source>
+            <outputDirectory>bin</outputDirectory>
+        </file>
+        <file>
+            <source>${basedir}/src/main/bash/run-beam-container.sh</source>
+            <outputDirectory>bin</outputDirectory>
+        </file>
+    </files>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>bin</outputDirectory>
+            <includes>
+                <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+            </includes>
+            <fileMode>0744</fileMode>
+            <unpack>true</unpack>
+        </dependencySet>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>beam-runners-samza</include>
+            </includes>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git a/src/main/bash/run-beam-container.sh b/src/main/bash/run-beam-container.sh
new file mode 100755
index 0000000..9d70d5a
--- /dev/null
+++ b/src/main/bash/run-beam-container.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if server is set. If not - set server optimization
+[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
+
+# Set container ID system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.id* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.id=$SAMZA_CONTAINER_ID"
+
+# Set container name system property for use in Log4J
+[[ $JAVA_OPTS != *-Dsamza.container.name* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-container-$SAMZA_CONTAINER_ID"
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+exec $(dirname $0)/run-class.sh $1 --config-factory=org.apache.beam.runners.samza.container.ContainerCfgFactory --config-path=none --config app.runner.class=org.apache.beam.runners.samza.container.BeamContainerRunner "$@"
\ No newline at end of file
diff --git a/src/main/bash/run-beam-yarn.sh b/src/main/bash/run-beam-yarn.sh
new file mode 100755
index 0000000..3003e5e
--- /dev/null
+++ b/src/main/bash/run-beam-yarn.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+op=$(if [[ "$@" =~ (--operation=)([^ ]*) ]]; then echo "${BASH_REMATCH[2]}"; else echo "run"; fi)
+
+case $op in
+  run)
+    exec $(dirname $0)/run-class.sh $1 --config task.execute="bin/run-beam-container.sh $1" "$@"
+  ;;
+
+  kill)
+    exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+  ;;
+
+  status)
+    exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner "$@"
+  ;;
+esac
\ No newline at end of file
diff --git a/src/main/config/word-count-standalone.properties b/src/main/config/word-count-standalone.properties
new file mode 100644
index 0000000..4c492d5
--- /dev/null
+++ b/src/main/config/word-count-standalone.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# runner
+app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
+
+# zk
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.coordinator.zk.connect=localhost:2181
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/config/word-count-yarn.properties b/src/main/config/word-count-yarn.properties
new file mode 100644
index 0000000..36d2fcf
--- /dev/null
+++ b/src/main/config/word-count-yarn.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# YARN package path
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+
+# default system
+job.default.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
new file mode 100644
index 0000000..11e5816
--- /dev/null
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * An example that counts words and includes Beam best practices.
+ *
+ * <p>For a detailed walkthrough of this example, see <a
+ * href="https://beam.apache.org/get-started/wordcount-example/">
+ * https://beam.apache.org/get-started/wordcount-example/ </a>
+ *
+ * <p>Basic concepts: Reading text files; counting a
+ * PCollection; writing to text files
+ *
+ * <p>New Concepts:
+ *
+ * <pre>
+ *   1. Executing a Pipeline both locally and using the selected runner
+ *   2. Using ParDo with static DoFns defined out-of-line
+ *   3. Building a composite transform
+ *   4. Defining your own pipeline options
+ * </pre>
+ *
+ * <p>To execute this pipeline with SamzaRunner:
+ *
+ * <pre>{@code
+ * $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
+ *      -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
+ * }</pre>
+ *
+ */
+public class WordCount {
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+
+  /**
+   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
+   * a ParDo in the pipeline.
+   */
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+    private final Distribution lineLenDist =
+        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> receiver) {
+      lineLenDist.update(element.length());
+      if (element.trim().isEmpty()) {
+        emptyLines.inc();
+      }
+
+      // Split the line into words.
+      String[] words = element.split(TOKENIZER_PATTERN, -1);
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(word);
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * A PTransform that converts a PCollection containing lines of text into a PCollection of
+   * formatted word counts.
+   *
+   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+   * modular testing, and an improved monitoring experience.
+   */
+  public static class CountWords
+      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
+
+      return wordCounts;
+    }
+  }
+
+  /**
+   * Options supported by {@link WordCount}.
+   *
+   * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments to
+   * be processed by the command-line parser, and specify default values for them. You can then
+   * access the options values in your pipeline code.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the text of King Lear. Set
+     * this option to choose a different input file or glob.
+     */
+    @Description("Path of the file to read from")
+    @Required
+    String getInputFile();
+
+    void setInputFile(String value);
+
+    /** Set this required option to specify where to write the output. */
+    @Description("Path of the file to write to")
+    @Required
+    String getOutput();
+
+    void setOutput(String value);
+  }
+
+  static void runWordCount(WordCountOptions options) {
+    Pipeline p = Pipeline.create(options);
+
+    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+    // static FormatAsTextFn() to the ParDo transform.
+    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+        .apply(new CountWords())
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
+
+    p.run().waitUntilFinish();
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
+    options.setJobName("word-count");
+
+    runWordCount(options);
+  }
+}
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..805d5ca
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="20" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="1" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <logger name="STARTUP_LOGGER" additivity="false">
+    <level value="info" />
+    <appender-ref ref="StartupAppender"/>
+  </logger>
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+    <appender-ref ref="jmx" />
+  </root>
+</log4j:configuration>