You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/07 03:28:12 UTC
[12/18] samza-hello-samza git commit: Upgrading to samza 1.0
Upgrading to samza 1.0
* Adding WikipediaStatsStreamTaskApplication, WikipediaParserStreamTaskApplication, WikipediaFeedStreamTaskApplication
* Removing task.inputs, and task.class from config.
* Adding app.class.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #43 from rmatharu/upgrading
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/108b6d52
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/108b6d52
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/108b6d52
Branch: refs/heads/master
Commit: 108b6d528ea7f70514f84ff8c05fd7ecd7bc3203
Parents: a096c75
Author: Ray Matharu <rm...@linkedin.com>
Authored: Wed Oct 24 17:58:10 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Oct 24 17:58:10 2018 -0700
----------------------------------------------------------------------
bin/deploy.sh | 2 +-
build.gradle | 2 +-
gradle.properties | 2 +-
pom.xml | 9 +-
src/main/config/wikipedia-feed.properties | 24 +-----
src/main/config/wikipedia-parser.properties | 22 +----
src/main/config/wikipedia-stats.properties | 23 ++----
.../examples/azure/AzureZKLocalApplication.java | 2 +-
.../application/WikipediaApplication.java | 16 +++-
.../WikipediaFeedTaskApplication.java | 87 ++++++++++++++++++++
.../WikipediaParserTaskApplication.java | 72 ++++++++++++++++
.../WikipediaStatsTaskApplication.java | 68 +++++++++++++++
12 files changed, 262 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 3c3ada2..08c0601 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@ base_dir=`pwd`
mvn clean package
mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6201bc5..a897372 100644
--- a/build.gradle
+++ b/build.gradle
@@ -50,7 +50,7 @@ dependencies {
compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
-
+ compile(group: 'org.apache.samza', name: 'samza-azure', version: "$SAMZA_VERSION")
explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 34a540a..bfc8582 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=1.0.0-SNAPSHOT
+SAMZA_VERSION=1.0.1
KAFKA_VERSION=0.11.0.2
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8659683..167ea1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>1.0.0-SNAPSHOT</version>
+ <version>1.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -123,6 +123,11 @@ under the License.
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.1</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@@ -153,7 +158,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>1.0.0-SNAPSHOT</samza.version>
+ <samza.version>1.0.1-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties
index 180d749..9fee678 100644
--- a/src/main/config/wikipedia-feed.properties
+++ b/src/main/config/wikipedia-feed.properties
@@ -19,29 +19,11 @@
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-feed
-# YARN
+# YARN package path
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
-task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaFeedTaskApplication
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-
-# Wikipedia System
-systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
-systems.wikipedia.host=irc.wikimedia.org
-systems.wikipedia.port=6667
-
-# Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model
# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details
-job.coordinator.replication.factor=1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index e8f3fa0..d9614c2 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -22,23 +22,5 @@ job.name=wikipedia-parser
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
-task.inputs=kafka.wikipedia-raw
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.streams.metrics.samza.msg.serde=metrics
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-# Normally, this would be 3, but we have only one broker.
-job.coordinator.replication.factor=1
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaParserTaskApplication
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index 0a1cf31..7da456b 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -19,12 +19,13 @@
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-stats
-# YARN
+# YARN package path
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
-task.inputs=kafka.wikipedia-edits
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaStatsTaskApplication
+
+# Setting the window frequency in milliseconds
task.window.ms=10000
# Metrics
@@ -33,17 +34,10 @@ metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapsho
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+# Serializers (used below in specifying the stores' serdes)
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.bootstrap.servers=localhost:9092
# Key-value storage
stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
@@ -57,8 +51,3 @@ stores.wikipedia-stats.changelog.replication.factor=1
# Normally, we'd set this much higher, but we want things to look snappy in the demo.
stores.wikipedia-stats.write.batch.size=0
stores.wikipedia-stats.object.cache.size=0
-
-# Job Coordinator
-job.coordinator.system=kafka
-# Normally, this would be 3, but we have only one broker.
-job.coordinator.replication.factor=1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
index 01075e2..462e389 100644
--- a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
+++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
@@ -23,7 +23,7 @@ import joptsimple.OptionSet;
import org.apache.samza.config.Config;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
-import samza.examples.azure.AzureApplication;
+
public class AzureZKLocalApplication {
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 60bbe15..9077480 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -87,7 +87,11 @@ public class WikipediaApplication implements StreamApplication, Serializable {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
+
+ // Define a SystemDescriptor for Wikipedia data
WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+
+ // Define InputDescriptors for consuming wikipedia data
WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
.getInputDescriptor("en-wikipedia")
.withChannel("#en.wikipedia");
@@ -98,14 +102,19 @@ public class WikipediaApplication implements StreamApplication, Serializable {
.getInputDescriptor("en-wikinews")
.withChannel("#en.wikinews");
+ // Define a system descriptor for Kafka
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
.withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
.withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
.withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+ // Define an output descriptor
KafkaOutputDescriptor<WikipediaStatsOutput> statsOutputDescriptor =
kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerdeV2<>(WikipediaStatsOutput.class));
+
+ // Set the default system descriptor to Kafka, so that it is used for all
+ // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
MessageStream<WikipediaFeedEvent> wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor);
MessageStream<WikipediaFeedEvent> wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor);
@@ -155,7 +164,9 @@ public class WikipediaApplication implements StreamApplication, Serializable {
// Update persisted total
Integer editsAllTime = store.get(EDIT_COUNT_KEY);
- if (editsAllTime == null) editsAllTime = 0;
+ if (editsAllTime == null) {
+ editsAllTime = 0;
+ }
editsAllTime++;
store.put(EDIT_COUNT_KEY, editsAllTime);
@@ -185,8 +196,7 @@ public class WikipediaApplication implements StreamApplication, Serializable {
*/
private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
WikipediaStats stats = statsWindowPane.getMessage();
- return new WikipediaStatsOutput(
- stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
+ return new WikipediaStatsOutput(stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
}
/**
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
new file mode 100644
index 0000000..12d29b0
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor;
+import samza.examples.wikipedia.task.WikipediaFeedStreamTask;
+
+
+/**
+ * This TaskApplication is responsible for consuming data from wikipedia, wiktionary, and wikinews data sources, and
+ * merging them into a single Kafka topic called wikipedia-raw.
+ *
+ *
+ */
+public class WikipediaFeedTaskApplication implements TaskApplication {
+
+ private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ @Override
+ public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+ // Define a SystemDescriptor for Wikipedia data
+ WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+
+ // Define InputDescriptors for consuming wikipedia data
+ WikipediaInputDescriptor wikipediaInputDescriptor =
+ wikipediaSystemDescriptor.getInputDescriptor("en-wikipedia").withChannel("#en.wikipedia");
+ WikipediaInputDescriptor wiktionaryInputDescriptor =
+ wikipediaSystemDescriptor.getInputDescriptor("en-wiktionary").withChannel("#en.wiktionary");
+ WikipediaInputDescriptor wikiNewsInputDescriptor =
+ wikipediaSystemDescriptor.getInputDescriptor("en-wikinews").withChannel("#en.wikinews");
+
+ // Define a system descriptor for Kafka, which is our output system
+ KafkaSystemDescriptor kafkaSystemDescriptor =
+ new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+ .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+ .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+ // Define an output descriptor
+ KafkaOutputDescriptor kafkaOutputDescriptor =
+ kafkaSystemDescriptor.getOutputDescriptor("wikipedia-raw", new JsonSerde<>());
+
+ // Set the default system descriptor to Kafka, so that it is used for all
+ // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+ taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+ // Set the inputs
+ taskApplicationDescriptor.withInputStream(wikipediaInputDescriptor);
+ taskApplicationDescriptor.withInputStream(wiktionaryInputDescriptor);
+ taskApplicationDescriptor.withInputStream(wikiNewsInputDescriptor);
+
+ // Set the output
+ taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor);
+
+ // Set the task factory
+ taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaFeedStreamTask());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
new file mode 100644
index 0000000..5b6275b
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.task.WikipediaParserStreamTask;
+
+
+public class WikipediaParserTaskApplication implements TaskApplication {
+
+ private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ @Override
+ public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+ // Define a system descriptor for Kafka, which is both our input and output system
+ KafkaSystemDescriptor kafkaSystemDescriptor =
+ new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+ .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+ .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+ // Input descriptor for the wikipedia-raw topic
+ KafkaInputDescriptor kafkaInputDescriptor =
+ kafkaSystemDescriptor.getInputDescriptor("wikipedia-raw", new JsonSerde<>());
+
+ // Output descriptor for the wikipedia-edits topic
+ KafkaOutputDescriptor kafkaOutputDescriptor =
+ kafkaSystemDescriptor.getOutputDescriptor("wikipedia-edits", new JsonSerde<>());
+
+ // Set the default system descriptor to Kafka, so that it is used for all
+ // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+ taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+ // Set the input
+ taskApplicationDescriptor.withInputStream(kafkaInputDescriptor);
+
+ // Set the output
+ taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor);
+
+ // Set the task factory
+ taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaParserStreamTask());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
new file mode 100644
index 0000000..68ecf4a
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.task.WikipediaStatsStreamTask;
+
+
+public class WikipediaStatsTaskApplication implements TaskApplication {
+
+ private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ @Override
+ public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+ // Define a system descriptor for Kafka
+ KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
+ .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+ .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+ .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+ // Input descriptor for the wikipedia-edits topic
+ KafkaInputDescriptor kafkaInputDescriptor =
+ kafkaSystemDescriptor.getInputDescriptor("wikipedia-edits", new JsonSerde<>());
+
+ // Set the default system descriptor to Kafka, so that it is used for all
+ // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+ taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+ // Set the input
+ taskApplicationDescriptor.withInputStream(kafkaInputDescriptor);
+
+ // Set the output
+ taskApplicationDescriptor.withOutputStream(
+ kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerde<>()));
+
+ // Set the task factory
+ taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaStatsStreamTask());
+ }
+}
+