You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/12 14:52:22 UTC
samza-hello-samza git commit: SAMZA-1237: Add support for standalone
mode to wikipedia application
Repository: samza-hello-samza
Updated Branches:
refs/heads/latest 3d0e919e6 -> 591aaebc4
SAMZA-1237: Add support for standalone mode to wikipedia application
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #13 from bharathkk/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/591aaebc
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/591aaebc
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/591aaebc
Branch: refs/heads/latest
Commit: 591aaebc4ffca24a193b068fdc526e20cc57d06b
Parents: 3d0e919
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Fri May 12 07:51:59 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 12 07:51:59 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
bin/grid | 27 ++++++++-
bin/run-wikipedia-standalone-application.sh | 30 ++++++++++
src/main/assembly/src.xml | 52 ++++-------------
...ikipedia-application-local-runner.properties | 60 ++++++++++++++++++++
.../application/WikipediaApplication.java | 2 +-
.../WikipediaZkLocalApplication.java | 47 +++++++++++++++
7 files changed, 177 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ce6a..f31af00 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ deploy
*.swp
build/
.gradle/
+state
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 7d2112b..5f715b5 100755
--- a/bin/grid
+++ b/bin/grid
@@ -39,7 +39,7 @@ DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
-SERVICE_WAIT_TIMEOUT_SEC=10
+SERVICE_WAIT_TIMEOUT_SEC=20
ZOOKEEPER_PORT=2181
RESOURCEMANAGER_PORT=8032
NODEMANAGER_PORT=8042
@@ -55,6 +55,16 @@ bootstrap() {
exit 0
}
+standalone() {
+ echo "Setting up the ystem..."
+ stop_all
+ rm -rf "$DEPLOY_ROOT_DIR"
+ mkdir "$DEPLOY_ROOT_DIR"
+ install_all_without_yarn
+ start_all_without_yarn
+ exit 0
+}
+
install_all() {
$DIR/grid install samza
$DIR/grid install zookeeper
@@ -62,6 +72,12 @@ install_all() {
$DIR/grid install kafka
}
+install_all_without_yarn() {
+ $DIR/grid install samza
+ $DIR/grid install zookeeper
+ $DIR/grid install kafka
+}
+
install_samza() {
mkdir -p "$DEPLOY_ROOT_DIR"
if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
@@ -128,6 +144,11 @@ start_all() {
$DIR/grid start kafka
}
+start_all_without_yarn() {
+ $DIR/grid start zookeeper
+ $DIR/grid start kafka
+}
+
start_zookeeper() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
@@ -218,6 +239,9 @@ stop_kafka() {
if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then
bootstrap
exit 0
+elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then
+ standalone
+ exit 0
elif (test -z "$COMMAND" && test -z "$SYSTEM") \
|| ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
echo
@@ -225,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \
echo
echo " $ grid"
echo " $ grid bootstrap"
+ echo " $ grid standalone"
echo " $ grid install [yarn|kafka|zookeeper|samza|all]"
echo " $ grid start [yarn|kafka|zookeeper|all]"
echo " $ grid stop [yarn|kafka|zookeeper|all]"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/run-wikipedia-standalone-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh
new file mode 100755
index 0000000..f750e2b
--- /dev/null
+++ b/bin/run-wikipedia-standalone-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication "$@"
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index ca90ebf..69cbbbe 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -28,53 +28,25 @@
<include>NOTICE*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/config</directory>
+ <includes>
+ <include>*.properties</include>
+ </includes>
+ <outputDirectory>config</outputDirectory>
+ <!-- filtered=true, so we do variable expansion so the yarn package path
+ always points to the correct spot on any machine -->
+ <filtered>true</filtered>
+ </fileSet>
</fileSets>
<files>
<file>
<source>${basedir}/src/main/resources/log4j.xml</source>
<outputDirectory>lib</outputDirectory>
</file>
- <!-- filtered=true, so we do variable expansion so the yarn package path
- always points to the correct spot on any machine -->
- <file>
- <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
<file>
- <source>${basedir}/src/main/config/wikipedia-application.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-sessionizer.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-filter.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
+ <source>${basedir}/bin/run-local-app.sh</source>
+ <outputDirectory>bin</outputDirectory>
</file>
</files>
<dependencySets>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/config/wikipedia-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties
new file mode 100644
index 0000000..965a131
--- /dev/null
+++ b/src/main/config/wikipedia-application-local-runner.properties
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+coordinator.zk.connect=localhost:2181
+
+# Task/Application
+app.processor-id-generator.class=org.apache.samza.runtime.UUIDGenerator
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactgory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index b0779db..3432e3d 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -88,7 +88,7 @@ public class WikipediaApplication implements StreamApplication {
OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
// Merge inputs
- MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents));
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
// Parse, update stats, prepare output, and send
allWikipediaEvents.map(WikipediaParser::parseEvent)
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
new file mode 100644
index 0000000..8e978bc
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.Util;
+
+
+/**
+ * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper.
+ * It waits for the job to finish; The job can also be ended by killing this process.
+ */
+public class WikipediaZkLocalApplication {
+
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config orgConfig = cmdLine.loadConfig(options);
+ Config config = Util.rewriteConfig(orgConfig);
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ WikipediaApplication app = new WikipediaApplication();
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+}