You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/09 00:08:22 UTC
[samza-beam-examples] branch master updated: Add yarn examples
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git
The following commit(s) were added to refs/heads/master by this push:
new a4addbd Add yarn examples
a4addbd is described below
commit a4addbd148621cea54f2ebbb9ba651a90317e6d0
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Jan 8 15:51:25 2019 -0800
Add yarn examples
---
conf/yarn-site.xml | 33 ++++++++++++++++++++++
pom.xml | 16 +++++++++--
src/main/assembly/samza.xml | 1 +
src/main/bash/run-beam-container.sh | 2 +-
src/main/bash/run-beam-yarn.sh | 4 ++-
...standalone.properties => standalone.properties} | 0
...{word-count-yarn.properties => yarn.properties} | 1 +
.../org/apache/beam/examples/KafkaWordCount.java | 17 +++++++++--
.../java/org/apache/beam/examples/WordCount.java | 18 +++++++++++-
9 files changed, 84 insertions(+), 8 deletions(-)
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
new file mode 100644
index 0000000..9028590
--- /dev/null
+++ b/conf/yarn-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration>
+ <property>
+ <name>yarn.resourcemanager.scheduler.class</name>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
+ </property>
+ <property>
+ <name>yarn.nodemanager.vmem-pmem-ratio</name>
+ <value>10</value>
+ </property>
+ <property>
+ <name>yarn.resourcemanager.hostname</name>
+ <value>127.0.0.1</value>
+ </property>
+</configuration>
diff --git a/pom.xml b/pom.xml
index 2db2df0..d78812a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,18 @@
<version>${samza.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-log4j</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-yarn_2.11</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>joda-time</groupId>
@@ -215,7 +227,7 @@
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
</dependency>
-
+
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
@@ -235,7 +247,7 @@
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
-
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/src/main/assembly/samza.xml b/src/main/assembly/samza.xml
index 570b07f..a4685b7 100644
--- a/src/main/assembly/samza.xml
+++ b/src/main/assembly/samza.xml
@@ -70,6 +70,7 @@
<includes>
<include>beam-runners-samza</include>
<include>org.apache.samza:samza-beam-examples</include>
+ <include>org.apache.samza:samza-yarn_2.11</include>
<include>org.apache.samza:samza-log4j</include>
</includes>
<useTransitiveFiltering>true</useTransitiveFiltering>
diff --git a/src/main/bash/run-beam-container.sh b/src/main/bash/run-beam-container.sh
index 9d70d5a..c58a1f1 100755
--- a/src/main/bash/run-beam-container.sh
+++ b/src/main/bash/run-beam-container.sh
@@ -31,4 +31,4 @@ cd $base_dir
base_dir=`pwd`
cd $home_dir
-exec $(dirname $0)/run-class.sh $1 --config-factory=org.apache.beam.runners.samza.container.ContainerCfgFactory --config-path=none --config app.runner.class=org.apache.beam.runners.samza.container.BeamContainerRunner "$@"
\ No newline at end of file
+exec $(dirname $0)/run-class.sh $1 --runner=org.apache.beam.runners.samza.SamzaRunner --configFactory=org.apache.beam.runners.samza.container.ContainerCfgFactory "${@:2}"
\ No newline at end of file
diff --git a/src/main/bash/run-beam-yarn.sh b/src/main/bash/run-beam-yarn.sh
index b0ada05..6d65075 100755
--- a/src/main/bash/run-beam-yarn.sh
+++ b/src/main/bash/run-beam-yarn.sh
@@ -29,9 +29,11 @@ mkdir -p $EXECUTION_PLAN_DIR
op=$(if [[ "$@" =~ (--operation=)([^ ]*) ]]; then echo "${BASH_REMATCH[2]}"; else echo "run"; fi)
+cmd="{\"task.execute\":\"bin/run-beam-container.sh $@\"}"
+
case $op in
run)
- exec $(dirname $0)/run-class.sh $1 --config task.execute="bin/run-beam-container.sh $1" "${@:2}"
+ exec $(dirname $0)/run-class.sh $1 --runner=org.apache.beam.runners.samza.SamzaRunner --configOverride="$cmd" "${@:2}"
;;
kill)
diff --git a/src/main/config/word-count-standalone.properties b/src/main/config/standalone.properties
similarity index 100%
rename from src/main/config/word-count-standalone.properties
rename to src/main/config/standalone.properties
diff --git a/src/main/config/word-count-yarn.properties b/src/main/config/yarn.properties
similarity index 94%
rename from src/main/config/word-count-yarn.properties
rename to src/main/config/yarn.properties
index 36d2fcf..5f8181a 100644
--- a/src/main/config/word-count-yarn.properties
+++ b/src/main/config/yarn.properties
@@ -20,6 +20,7 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.container.count=2
# default system
job.default.system=kafka
diff --git a/src/main/java/org/apache/beam/examples/KafkaWordCount.java b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
index a4e5001..8af9415 100644
--- a/src/main/java/org/apache/beam/examples/KafkaWordCount.java
+++ b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
@@ -58,14 +58,22 @@ import org.joda.time.Duration;
* }</pre>
*
* <p>To execute the example in distributed manner, use mvn to package it first:
+ * (remove .waitUntilFinish() in the code for yarn deployment)
* <pre>{@code
- * $ mkdir -p deploy/examples
+ * $ mkdir -p deploy/exaSmples
* $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
* }</pre>
*
- * <p>TO run in standalone with zookeeper:
+ * <p>To run in standalone with zookeeper:
+ * (large parallelism will enforce each partition in a task)
* <pre>{@code
- * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/word-count-standalone.properties --maxSourceParallelism=1024
+ * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/standalone.properties --maxSourceParallelism=1024
+ * }</pre>
+ *
+ * <p>To run in yarn:
+ * (large parallelism will enforce each partition in a task)
+ * <pre>{@code
+ * $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/yarn.properties --maxSourceParallelism=1024
* }</pre>
*
* <p>To produce some test data:
@@ -119,6 +127,9 @@ public class KafkaWordCount {
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
+ //For yarn, we don't need to wait after submitting the job,
+ //so there is no need for waitUntilFinish(). Please use
+ //p.run()
p.run().waitUntilFinish();
}
}
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
index 96751ae..a647c0a 100644
--- a/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -61,16 +61,29 @@ import org.apache.beam.sdk.values.PCollection;
* }</pre>
*
* <p>To execute the example in distributed manner, use mvn to package it first:
+ * (remove .waitUntilFinish() in the code for yarn deployment)
* <pre>{@code
* $ mkdir -p deploy/examples
* $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
* }</pre>
*
* <p>To execute this example in standalone with zookeeper:
+ * (split the input by 2)
* <pre>{@code
- * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.WordCount --configFilePath=$PWD/deploy/examples/config/word-count-standalone.properties --inputFile=pom.xml --output=word-counts.txt
+ * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.WordCount \
+ * --configFilePath=$PWD/deploy/examples/config/standalone.properties \
+ * --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml --output=word-counts.txt \
+ * --maxSourceParallelism=2
* }</pre>
*
+ * <p>To execute this example in yarn:
+ * (split the input by 2)
+ * <pre>{@code
+ * $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.WordCount \
+ * --configFilePath=$PWD/deploy/examples/config/yarn.properties \
+ * --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml \
+ * --output=/tmp/word-counts.txt --maxSourceParallelism=2
+ * }</pre>
*/
public class WordCount {
private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@@ -174,6 +187,9 @@ public class WordCount {
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
+ //For yarn, we don't need to wait after submitting the job,
+ //so there is no need for waitUntilFinish(). Please use
+ //p.run()
p.run().waitUntilFinish();
}