You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/09 00:08:22 UTC

[samza-beam-examples] branch master updated: Add yarn examples

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza-beam-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new a4addbd  Add yarn examples
a4addbd is described below

commit a4addbd148621cea54f2ebbb9ba651a90317e6d0
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Jan 8 15:51:25 2019 -0800

    Add yarn examples
---
 conf/yarn-site.xml                                 | 33 ++++++++++++++++++++++
 pom.xml                                            | 16 +++++++++--
 src/main/assembly/samza.xml                        |  1 +
 src/main/bash/run-beam-container.sh                |  2 +-
 src/main/bash/run-beam-yarn.sh                     |  4 ++-
 ...standalone.properties => standalone.properties} |  0
 ...{word-count-yarn.properties => yarn.properties} |  1 +
 .../org/apache/beam/examples/KafkaWordCount.java   | 17 +++++++++--
 .../java/org/apache/beam/examples/WordCount.java   | 18 +++++++++++-
 9 files changed, 84 insertions(+), 8 deletions(-)

diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
new file mode 100644
index 0000000..9028590
--- /dev/null
+++ b/conf/yarn-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at 
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an 
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration>
+  <property>
+    <name>yarn.resourcemanager.scheduler.class</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
+  </property>
+  <property>
+    <name>yarn.nodemanager.vmem-pmem-ratio</name>
+    <value>10</value>
+  </property>
+  <property>
+    <name>yarn.resourcemanager.hostname</name>
+    <value>127.0.0.1</value>
+  </property>
+</configuration>
diff --git a/pom.xml b/pom.xml
index 2db2df0..d78812a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,18 @@
       <version>${samza.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-log4j</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-yarn_2.11</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+
     <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
     <dependency>
       <groupId>joda-time</groupId>
@@ -215,7 +227,7 @@
       <artifactId>hamcrest-core</artifactId>
       <version>${hamcrest.version}</version>
     </dependency>
-    
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
@@ -235,7 +247,7 @@
       <version>${beam.version}</version>
       <scope>test</scope>
     </dependency>
-    
+
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git a/src/main/assembly/samza.xml b/src/main/assembly/samza.xml
index 570b07f..a4685b7 100644
--- a/src/main/assembly/samza.xml
+++ b/src/main/assembly/samza.xml
@@ -70,6 +70,7 @@
             <includes>
                 <include>beam-runners-samza</include>
                 <include>org.apache.samza:samza-beam-examples</include>
+                <include>org.apache.samza:samza-yarn_2.11</include>
                 <include>org.apache.samza:samza-log4j</include>
             </includes>
             <useTransitiveFiltering>true</useTransitiveFiltering>
diff --git a/src/main/bash/run-beam-container.sh b/src/main/bash/run-beam-container.sh
index 9d70d5a..c58a1f1 100755
--- a/src/main/bash/run-beam-container.sh
+++ b/src/main/bash/run-beam-container.sh
@@ -31,4 +31,4 @@ cd $base_dir
 base_dir=`pwd`
 cd $home_dir
 
-exec $(dirname $0)/run-class.sh $1 --config-factory=org.apache.beam.runners.samza.container.ContainerCfgFactory --config-path=none --config app.runner.class=org.apache.beam.runners.samza.container.BeamContainerRunner "$@"
\ No newline at end of file
+exec $(dirname $0)/run-class.sh $1 --runner=org.apache.beam.runners.samza.SamzaRunner --configFactory=org.apache.beam.runners.samza.container.ContainerCfgFactory "${@:2}"
\ No newline at end of file
diff --git a/src/main/bash/run-beam-yarn.sh b/src/main/bash/run-beam-yarn.sh
index b0ada05..6d65075 100755
--- a/src/main/bash/run-beam-yarn.sh
+++ b/src/main/bash/run-beam-yarn.sh
@@ -29,9 +29,11 @@ mkdir -p $EXECUTION_PLAN_DIR
 
 op=$(if [[ "$@" =~ (--operation=)([^ ]*) ]]; then echo "${BASH_REMATCH[2]}"; else echo "run"; fi)
 
+cmd="{\"task.execute\":\"bin/run-beam-container.sh $@\"}"
+
 case $op in
   run)
-    exec $(dirname $0)/run-class.sh $1 --config task.execute="bin/run-beam-container.sh $1" "${@:2}"
+    exec $(dirname $0)/run-class.sh $1 --runner=org.apache.beam.runners.samza.SamzaRunner --configOverride="$cmd" "${@:2}"
   ;;
 
   kill)
diff --git a/src/main/config/word-count-standalone.properties b/src/main/config/standalone.properties
similarity index 100%
rename from src/main/config/word-count-standalone.properties
rename to src/main/config/standalone.properties
diff --git a/src/main/config/word-count-yarn.properties b/src/main/config/yarn.properties
similarity index 94%
rename from src/main/config/word-count-yarn.properties
rename to src/main/config/yarn.properties
index 36d2fcf..5f8181a 100644
--- a/src/main/config/word-count-yarn.properties
+++ b/src/main/config/yarn.properties
@@ -20,6 +20,7 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
 
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.container.count=2
 
 # default system
 job.default.system=kafka
diff --git a/src/main/java/org/apache/beam/examples/KafkaWordCount.java b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
index a4e5001..8af9415 100644
--- a/src/main/java/org/apache/beam/examples/KafkaWordCount.java
+++ b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
@@ -58,14 +58,22 @@ import org.joda.time.Duration;
  * }</pre>
  *
  * <p>To execute the example in distributed manner, use mvn to package it first:
+ * (remove .waitUntilFinish() in the code for yarn deployment)
  * <pre>{@code
- * $ mkdir -p deploy/examples
+ * $ mkdir -p deploy/exaSmples
  * $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
  * }</pre>
  *
- * <p>TO run in standalone with zookeeper:
+ * <p>To run in standalone with zookeeper:
+ * (large parallelism will enforce each partition in a task)
  * <pre>{@code
- * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/word-count-standalone.properties --maxSourceParallelism=1024
+ * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/standalone.properties --maxSourceParallelism=1024
+ * }</pre>
+ *
+ * <p>To run in yarn:
+ * (large parallelism will enforce each partition in a task)
+ * <pre>{@code
+ * $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/yarn.properties --maxSourceParallelism=1024
  * }</pre>
  *
  * <p>To produce some test data:
@@ -119,6 +127,9 @@ public class KafkaWordCount {
             .withKeySerializer(StringSerializer.class)
             .withValueSerializer(StringSerializer.class));
 
+    //For yarn, we don't need to wait after submitting the job,
+    //so there is no need for waitUntilFinish(). Please use
+    //p.run()
     p.run().waitUntilFinish();
   }
 }
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
index 96751ae..a647c0a 100644
--- a/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -61,16 +61,29 @@ import org.apache.beam.sdk.values.PCollection;
  * }</pre>
  *
  * <p>To execute the example in distributed manner, use mvn to package it first:
+ * (remove .waitUntilFinish() in the code for yarn deployment)
  * <pre>{@code
  * $ mkdir -p deploy/examples
  * $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
  * }</pre>
  *
  * <p>To execute this example in standalone with zookeeper:
+ * (split the input by 2)
  * <pre>{@code
- * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.WordCount --configFilePath=$PWD/deploy/examples/config/word-count-standalone.properties --inputFile=pom.xml --output=word-counts.txt
+ * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.WordCount \
+ *     --configFilePath=$PWD/deploy/examples/config/standalone.properties \
+ *     --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml --output=word-counts.txt \
+ *     --maxSourceParallelism=2
  * }</pre>
  *
+ * <p>To execute this example in yarn:
+ * (split the input by 2)
+ * <pre>{@code
+ * $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.WordCount \
+ *     --configFilePath=$PWD/deploy/examples/config/yarn.properties \
+ *     --inputFile=/Users/xiliu/opensource/samza-beam-examples/pom.xml \
+ *     --output=/tmp/word-counts.txt --maxSourceParallelism=2
+ * }</pre>
  */
 public class WordCount {
   private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@@ -174,6 +187,9 @@ public class WordCount {
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(options.getOutput()).withoutSharding());
 
+    //For yarn, we don't need to wait after submitting the job,
+    //so there is no need for waitUntilFinish(). Please use
+    //p.run()
     p.run().waitUntilFinish();
   }