You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/25 00:58:14 UTC

samza-hello-samza git commit: Upgrading to samza 1.0

Repository: samza-hello-samza
Updated Branches:
  refs/heads/latest a096c7533 -> 108b6d528


Upgrading to samza 1.0

* Adding WikipediaStatsStreamTaskApplication, WikipediaParserStreamTaskApplication, WikipediaFeedStreamTaskApplication
* Removing task.inputs, and task.class from config.

* Adding app.class.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #43 from rmatharu/upgrading


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/108b6d52
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/108b6d52
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/108b6d52

Branch: refs/heads/latest
Commit: 108b6d528ea7f70514f84ff8c05fd7ecd7bc3203
Parents: a096c75
Author: Ray Matharu <rm...@linkedin.com>
Authored: Wed Oct 24 17:58:10 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Oct 24 17:58:10 2018 -0700

----------------------------------------------------------------------
 bin/deploy.sh                                   |  2 +-
 build.gradle                                    |  2 +-
 gradle.properties                               |  2 +-
 pom.xml                                         |  9 +-
 src/main/config/wikipedia-feed.properties       | 24 +-----
 src/main/config/wikipedia-parser.properties     | 22 +----
 src/main/config/wikipedia-stats.properties      | 23 ++----
 .../examples/azure/AzureZKLocalApplication.java |  2 +-
 .../application/WikipediaApplication.java       | 16 +++-
 .../WikipediaFeedTaskApplication.java           | 87 ++++++++++++++++++++
 .../WikipediaParserTaskApplication.java         | 72 ++++++++++++++++
 .../WikipediaStatsTaskApplication.java          | 68 +++++++++++++++
 12 files changed, 262 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 3c3ada2..08c0601 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@ base_dir=`pwd`
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6201bc5..a897372 100644
--- a/build.gradle
+++ b/build.gradle
@@ -50,7 +50,7 @@ dependencies {
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
-
+    compile(group: 'org.apache.samza', name: 'samza-azure', version: "$SAMZA_VERSION")
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
 
     runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 34a540a..bfc8582 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-SAMZA_VERSION=1.0.0-SNAPSHOT
+SAMZA_VERSION=1.0.1
 KAFKA_VERSION=0.11.0.2
 HADOOP_VERSION=2.6.1
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8659683..167ea1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>1.0.0-SNAPSHOT</version>
+  <version>1.0.1-SNAPSHOT</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -123,6 +123,11 @@ under the License.
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
+   <dependency>
+    <groupId>org.apache.httpcomponents</groupId>
+    <artifactId>httpcore</artifactId>
+    <version>4.4.1</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
@@ -153,7 +158,7 @@ under the License.
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>1.0.0-SNAPSHOT</samza.version>
+    <samza.version>1.0.1-SNAPSHOT</samza.version>
     <hadoop.version>2.6.1</hadoop.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties
index 180d749..9fee678 100644
--- a/src/main/config/wikipedia-feed.properties
+++ b/src/main/config/wikipedia-feed.properties
@@ -19,29 +19,11 @@
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=wikipedia-feed
 
-# YARN
+# YARN package path
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
-task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaFeedTaskApplication
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-
-# Wikipedia System
-systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
-systems.wikipedia.host=irc.wikimedia.org
-systems.wikipedia.port=6667
-
-# Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
 # Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model
 # See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details
-job.coordinator.replication.factor=1

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index e8f3fa0..d9614c2 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -22,23 +22,5 @@ job.name=wikipedia-parser
 # YARN
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
-task.inputs=kafka.wikipedia-raw
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.streams.metrics.samza.msg.serde=metrics
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-
-# Job Coordinator
-job.coordinator.system=kafka
-# Normally, this would be 3, but we have only one broker.
-job.coordinator.replication.factor=1
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaParserTaskApplication
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index 0a1cf31..7da456b 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -19,12 +19,13 @@
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=wikipedia-stats
 
-# YARN
+# YARN package path
 yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
 
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
-task.inputs=kafka.wikipedia-edits
+# TaskApplication class
+app.class=samza.examples.wikipedia.task.application.WikipediaStatsTaskApplication
+
+# Setting the window frequency in milliseconds
 task.window.ms=10000
 
 # Metrics
@@ -33,17 +34,10 @@ metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapsho
 metrics.reporter.snapshot.stream=kafka.metrics
 metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
 
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+# Serializers (used below in specifying the stores' serdes)
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
 
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.bootstrap.servers=localhost:9092
 
 # Key-value storage
 stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
@@ -57,8 +51,3 @@ stores.wikipedia-stats.changelog.replication.factor=1
 # Normally, we'd set this much higher, but we want things to look snappy in the demo.
 stores.wikipedia-stats.write.batch.size=0
 stores.wikipedia-stats.object.cache.size=0
-
-# Job Coordinator
-job.coordinator.system=kafka
-# Normally, this would be 3, but we have only one broker.
-job.coordinator.replication.factor=1

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
index 01075e2..462e389 100644
--- a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
+++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
@@ -23,7 +23,7 @@ import joptsimple.OptionSet;
 import org.apache.samza.config.Config;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.util.CommandLine;
-import samza.examples.azure.AzureApplication;
+
 
 public class AzureZKLocalApplication {
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 60bbe15..9077480 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -87,7 +87,11 @@ public class WikipediaApplication implements StreamApplication, Serializable {
 
   @Override
   public void describe(StreamApplicationDescriptor appDescriptor) {
+
+    // Define a SystemDescriptor for Wikipedia data
     WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+
+    // Define InputDescriptors for consuming wikipedia data
     WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
         .getInputDescriptor("en-wikipedia")
         .withChannel("#en.wikipedia");
@@ -98,14 +102,19 @@ public class WikipediaApplication implements StreamApplication, Serializable {
         .getInputDescriptor("en-wikinews")
         .withChannel("#en.wikinews");
 
+    // Define a system descriptor for Kafka
     KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
         .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
         .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
         .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
 
+    // Define an output descriptor
     KafkaOutputDescriptor<WikipediaStatsOutput> statsOutputDescriptor =
         kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerdeV2<>(WikipediaStatsOutput.class));
 
+
+    // Set the default system descriptor to Kafka, so that it is used for all
+    // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
     appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
     MessageStream<WikipediaFeedEvent> wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor);
     MessageStream<WikipediaFeedEvent> wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor);
@@ -155,7 +164,9 @@ public class WikipediaApplication implements StreamApplication, Serializable {
 
       // Update persisted total
       Integer editsAllTime = store.get(EDIT_COUNT_KEY);
-      if (editsAllTime == null) editsAllTime = 0;
+      if (editsAllTime == null) {
+        editsAllTime = 0;
+      }
       editsAllTime++;
       store.put(EDIT_COUNT_KEY, editsAllTime);
 
@@ -185,8 +196,7 @@ public class WikipediaApplication implements StreamApplication, Serializable {
    */
   private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
     WikipediaStats stats = statsWindowPane.getMessage();
-    return new WikipediaStatsOutput(
-        stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
+    return new WikipediaStatsOutput(stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
new file mode 100644
index 0000000..12d29b0
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor;
+import samza.examples.wikipedia.task.WikipediaFeedStreamTask;
+
+
+/**
+ * This TaskApplication is responsible for consuming data from wikipedia, wiktionary, and wikinews data sources, and
+ * merging them into a single Kafka topic called wikipedia-raw.
+ *
+ *
+ */
+public class WikipediaFeedTaskApplication implements TaskApplication {
+
+  private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+  private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+  private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+  @Override
+  public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+    // Define a SystemDescriptor for Wikipedia data
+    WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+
+    // Define InputDescriptors for consuming wikipedia data
+    WikipediaInputDescriptor wikipediaInputDescriptor =
+        wikipediaSystemDescriptor.getInputDescriptor("en-wikipedia").withChannel("#en.wikipedia");
+    WikipediaInputDescriptor wiktionaryInputDescriptor =
+        wikipediaSystemDescriptor.getInputDescriptor("en-wiktionary").withChannel("#en.wiktionary");
+    WikipediaInputDescriptor wikiNewsInputDescriptor =
+        wikipediaSystemDescriptor.getInputDescriptor("en-wikinews").withChannel("#en.wikinews");
+
+    // Define a system descriptor for Kafka, which is our output system
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+            .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+            .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+    // Define an output descriptor
+    KafkaOutputDescriptor kafkaOutputDescriptor =
+        kafkaSystemDescriptor.getOutputDescriptor("wikipedia-raw", new JsonSerde<>());
+
+    // Set the default system descriptor to Kafka, so that it is used for all
+    // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+    taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+    // Set the inputs
+    taskApplicationDescriptor.withInputStream(wikipediaInputDescriptor);
+    taskApplicationDescriptor.withInputStream(wiktionaryInputDescriptor);
+    taskApplicationDescriptor.withInputStream(wikiNewsInputDescriptor);
+
+    // Set the output
+    taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor);
+
+    // Set the task factory
+    taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaFeedStreamTask());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
new file mode 100644
index 0000000..5b6275b
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.task.WikipediaParserStreamTask;
+
+
+public class WikipediaParserTaskApplication implements TaskApplication {
+
+  private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+  private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+  private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+  @Override
+  public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+    // Define a system descriptor for Kafka, which is both our input and output system
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+            .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+            .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+    // Input descriptor for the wikipedia-raw topic
+    KafkaInputDescriptor kafkaInputDescriptor =
+        kafkaSystemDescriptor.getInputDescriptor("wikipedia-raw", new JsonSerde<>());
+
+    // Output descriptor for the wikipedia-edits topic
+    KafkaOutputDescriptor kafkaOutputDescriptor =
+        kafkaSystemDescriptor.getOutputDescriptor("wikipedia-edits", new JsonSerde<>());
+
+    // Set the default system descriptor to Kafka, so that it is used for all
+    // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+    taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+    // Set the input
+    taskApplicationDescriptor.withInputStream(kafkaInputDescriptor);
+
+    // Set the output
+    taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor);
+
+    // Set the task factory
+    taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaParserStreamTask());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
new file mode 100644
index 0000000..68ecf4a
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.task.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import samza.examples.wikipedia.task.WikipediaStatsStreamTask;
+
+
+public class WikipediaStatsTaskApplication implements TaskApplication {
+
+  private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+  private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+  private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+  @Override
+  public void describe(TaskApplicationDescriptor taskApplicationDescriptor) {
+
+    // Define a system descriptor for Kafka
+    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
+        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+        .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+    // Input descriptor for the wikipedia-edits topic
+    KafkaInputDescriptor kafkaInputDescriptor =
+        kafkaSystemDescriptor.getInputDescriptor("wikipedia-edits", new JsonSerde<>());
+
+    // Set the default system descriptor to Kafka, so that it is used for all
+    // internal resources, e.g., kafka topic for checkpointing, coordinator stream.
+    taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor);
+
+    // Set the input
+    taskApplicationDescriptor.withInputStream(kafkaInputDescriptor);
+
+    // Set the output
+    taskApplicationDescriptor.withOutputStream(
+        kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerde<>()));
+
+    // Set the task factory
+    taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaStatsStreamTask());
+  }
+}
+