You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:25:59 UTC

[18/33] samza-hello-samza git commit: SAMZA-1237: Add support for standalone mode to wikipedia application

SAMZA-1237: Add support for standalone mode to wikipedia application

Author: Bharath Kumarasubramanian <bk...@linkedin.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #13 from bharathkk/latest


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/591aaebc
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/591aaebc
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/591aaebc

Branch: refs/heads/master
Commit: 591aaebc4ffca24a193b068fdc526e20cc57d06b
Parents: 3d0e919
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Fri May 12 07:51:59 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 12 07:51:59 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 bin/grid                                        | 27 ++++++++-
 bin/run-wikipedia-standalone-application.sh     | 30 ++++++++++
 src/main/assembly/src.xml                       | 52 ++++-------------
 ...ikipedia-application-local-runner.properties | 60 ++++++++++++++++++++
 .../application/WikipediaApplication.java       |  2 +-
 .../WikipediaZkLocalApplication.java            | 47 +++++++++++++++
 7 files changed, 177 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ce6a..f31af00 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ deploy
 *.swp
 build/
 .gradle/
+state

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 7d2112b..5f715b5 100755
--- a/bin/grid
+++ b/bin/grid
@@ -39,7 +39,7 @@ DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
-SERVICE_WAIT_TIMEOUT_SEC=10
+SERVICE_WAIT_TIMEOUT_SEC=20
 ZOOKEEPER_PORT=2181
 RESOURCEMANAGER_PORT=8032
 NODEMANAGER_PORT=8042
@@ -55,6 +55,16 @@ bootstrap() {
   exit 0
 }
 
+standalone() {
+  echo "Setting up the ystem..."
+  stop_all
+  rm -rf "$DEPLOY_ROOT_DIR"
+  mkdir "$DEPLOY_ROOT_DIR"
+  install_all_without_yarn
+  start_all_without_yarn
+  exit 0
+}
+
 install_all() {
   $DIR/grid install samza
   $DIR/grid install zookeeper
@@ -62,6 +72,12 @@ install_all() {
   $DIR/grid install kafka
 }
 
+install_all_without_yarn() {
+  $DIR/grid install samza
+  $DIR/grid install zookeeper
+  $DIR/grid install kafka
+}
+
 install_samza() {
   mkdir -p "$DEPLOY_ROOT_DIR"
   if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
@@ -128,6 +144,11 @@ start_all() {
   $DIR/grid start kafka
 }
 
+start_all_without_yarn() {
+  $DIR/grid start zookeeper
+  $DIR/grid start kafka
+}
+
 start_zookeeper() {
   if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
     cd $DEPLOY_ROOT_DIR/$SYSTEM
@@ -218,6 +239,9 @@ stop_kafka() {
 if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
   bootstrap
   exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+  standalone
+  exit 0
 elif (test -z "$COMMAND" && test -z "$SYSTEM") \
   || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
   echo
@@ -225,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \
   echo
   echo "  $ grid"
   echo "  $ grid bootstrap"
+  echo "  $ grid standalone"
   echo "  $ grid install [yarn|kafka|zookeeper|samza|all]"
   echo "  $ grid start [yarn|kafka|zookeeper|all]"
   echo "  $ grid stop [yarn|kafka|zookeeper|all]"

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/run-wikipedia-standalone-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh
new file mode 100755
index 0000000..f750e2b
--- /dev/null
+++ b/bin/run-wikipedia-standalone-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication "$@"

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index ca90ebf..69cbbbe 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -28,53 +28,25 @@
         <include>NOTICE*</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/config</directory>
+      <includes>
+        <include>*.properties</include>
+      </includes>
+      <outputDirectory>config</outputDirectory>
+      <!-- filtered=true, so we do variable expansion so the yarn package path
+      always points to the correct spot on any machine -->
+      <filtered>true</filtered>
+    </fileSet>
   </fileSets>
   <files>
     <file>
       <source>${basedir}/src/main/resources/log4j.xml</source>
       <outputDirectory>lib</outputDirectory>
     </file>
-    <!-- filtered=true, so we do variable expansion so the yarn package path
-      always points to the correct spot on any machine -->
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
     <file>
-      <source>${basedir}/src/main/config/wikipedia-application.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/pageview-sessionizer.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/pageview-filter.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
+      <source>${basedir}/bin/run-local-app.sh</source>
+      <outputDirectory>bin</outputDirectory>
     </file>
   </files>
   <dependencySets>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
new file mode 100644
index 0000000..965a131
--- /dev/null
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+coordinator.zk.connect=localhost:2181
+
+# Task/Application
+app.processor-id-generator.class=org.apache.samza.runtime.UUIDGenerator
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactgory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index b0779db..3432e3d 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -88,7 +88,7 @@ public class WikipediaApplication implements StreamApplication {
     OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
 
     // Merge inputs
-    MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents));
+    MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
 
     // Parse, update stats, prepare output, and send
     allWikipediaEvents.map(WikipediaParser::parseEvent)

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
new file mode 100644
index 0000000..8e978bc
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper.
+ * It waits for the job to finish; The job can also be ended by killing this process.
+ */
+public class WikipediaZkLocalApplication {
+
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    OptionSet options = cmdLine.parser().parse(args);
+    Config orgConfig = cmdLine.loadConfig(options);
+    Config config = Util.rewriteConfig(orgConfig);
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    WikipediaApplication app = new WikipediaApplication();
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+}