You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/12/09 19:15:08 UTC

[1/3] incubator-samza-hello-samza git commit: SAMZA-495; upgrade to 0.9.0 samza

Repository: incubator-samza-hello-samza
Updated Branches:
  refs/heads/latest 68d9961ef -> 20a32e014
  refs/heads/master 59d1877f0 -> f9efa43ac


SAMZA-495; upgrade to 0.9.0 samza


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/commit/20a32e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/tree/20a32e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/diff/20a32e01

Branch: refs/heads/latest
Commit: 20a32e014635d8dd50ac716f1bddcc5d80f1b042
Parents: 68d9961
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Dec 9 10:14:20 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Dec 9 10:14:20 2014 -0800

----------------------------------------------------------------------
 pom.xml                                    | 6 +++---
 src/main/assembly/src.xml                  | 3 +--
 src/main/config/wikipedia-stats.properties | 2 +-
 3 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e1f918..ab73a07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>0.8.0</version>
+  <version>0.9.0</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -70,7 +70,7 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.samza</groupId>
-      <artifactId>samza-kv-leveldb_2.10</artifactId>
+      <artifactId>samza-kv-rocksdb_2.10</artifactId>
       <version>${samza.version}</version>
     </dependency>
     <dependency>
@@ -113,7 +113,7 @@ under the License.
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.8.0-SNAPSHOT</samza.version>
+    <samza.version>0.9.0-SNAPSHOT</samza.version>
   </properties>
 
   <developers>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f9afd8..f57fee2 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -66,9 +66,8 @@
       <includes>
         <include>org.apache.samza:samza-core_2.10</include>
         <include>org.apache.samza:samza-kafka_2.10</include>
-        <include>org.apache.samza:samza-serializers_2.10</include>
         <include>org.apache.samza:samza-yarn_2.10</include>
-        <include>org.apache.samza:samza-kv-leveldb_2.10</include>
+        <include>org.apache.samza:samza-kv-rocksdb_2.10</include>
         <include>org.apache.samza:samza-log4j</include>
         <include>org.apache.samza:hello-samza</include>
         <include>org.slf4j:slf4j-log4j12</include>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index d06d559..69eff90 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -47,7 +47,7 @@ systems.kafka.producer.producer.type=sync
 systems.kafka.producer.batch.num.messages=1
 
 # Key-value storage
-stores.wikipedia-stats.factory=org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
 stores.wikipedia-stats.key.serde=string
 stores.wikipedia-stats.msg.serde=integer


[2/3] incubator-samza-hello-samza git commit: SAMZA-495; upgrade to 0.8.0 samza

Posted by cr...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
new file mode 100644
index 0000000..07cd8ac
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.Map;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+/**
+ * This task is very simple. All it does is take messages that it receives, and
+ * sends them to a Kafka topic called wikipedia-raw.
+ */
+public class WikipediaFeedStreamTask implements StreamTask {
+  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw");
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());
+    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
new file mode 100644
index 0000000..0505f58
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+public class WikipediaParserStreamTask implements StreamTask {
+  @SuppressWarnings("unchecked")
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
+    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
+
+    try {
+      Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
+
+      parsedJsonObject.put("channel", event.getChannel());
+      parsedJsonObject.put("source", event.getSource());
+      parsedJsonObject.put("time", event.getTime());
+
+      collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
+    } catch (Exception e) {
+      System.err.println("Unable to parse line: " + event);
+    }
+  }
+
+  public static Map<String, Object> parse(String line) {
+    Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+    Matcher m = p.matcher(line);
+
+    if (m.find() && m.groupCount() == 6) {
+      String title = m.group(1);
+      String flags = m.group(2);
+      String diffUrl = m.group(3);
+      String user = m.group(4);
+      int byteDiff = Integer.parseInt(m.group(5));
+      String summary = m.group(6);
+
+      Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+      flagMap.put("is-minor", flags.contains("M"));
+      flagMap.put("is-new", flags.contains("N"));
+      flagMap.put("is-unpatrolled", flags.contains("!"));
+      flagMap.put("is-bot-edit", flags.contains("B"));
+      flagMap.put("is-special", title.startsWith("Special:"));
+      flagMap.put("is-talk", title.startsWith("Talk:"));
+
+      Map<String, Object> root = new HashMap<String, Object>();
+
+      root.put("title", title);
+      root.put("user", user);
+      root.put("unparsed-flags", flags);
+      root.put("diff-bytes", byteDiff);
+      root.put("diff-url", diffUrl);
+      root.put("summary", summary);
+      root.put("flags", flagMap);
+
+      return root;
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static void main(String[] args) {
+    String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]]  http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */  Added to note regarding David Shepard's brothers" };
+
+    for (String line : lines) {
+      System.out.println(parse(line));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
new file mode 100644
index 0000000..60fd93d
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
+  private int edits = 0;
+  private int byteDiff = 0;
+  private Set<String> titles = new HashSet<String>();
+  private Map<String, Integer> counts = new HashMap<String, Integer>();
+  private KeyValueStore<String, Integer> store;
+
+  public void init(Config config, TaskContext context) {
+    this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> edit = (Map<String, Object>) envelope.getMessage();
+    Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+
+    Integer editsAllTime = store.get("count-edits-all-time");
+    if (editsAllTime == null) editsAllTime = 0;
+    store.put("count-edits-all-time", editsAllTime + 1);
+
+    edits += 1;
+    titles.add((String) edit.get("title"));
+    byteDiff += (Integer) edit.get("diff-bytes");
+
+    for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+      if (Boolean.TRUE.equals(flag.getValue())) {
+        Integer count = counts.get(flag.getKey());
+
+        if (count == null) {
+          count = 0;
+        }
+
+        count += 1;
+        counts.put(flag.getKey(), count);
+      }
+    }
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) {
+    counts.put("edits", edits);
+    counts.put("bytes-added", byteDiff);
+    counts.put("unique-titles", titles.size());
+    counts.put("edits-all-time", store.get("count-edits-all-time"));
+
+    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
+
+    // Reset counts after windowing.
+    edits = 0;
+    byteDiff = 0;
+    titles = new HashSet<String>();
+    counts = new HashMap<String, Integer>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..f0de765
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+  <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+     <param name="DatePattern" value="'.'yyyy-MM-dd" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+    <appender-ref ref="jmx" />
+  </root>
+</log4j:configuration>


[3/3] incubator-samza-hello-samza git commit: SAMZA-495; upgrade to 0.8.0 samza

Posted by cr...@apache.org.
SAMZA-495; upgrade to 0.8.0 samza


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/commit/f9efa43a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/tree/f9efa43a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/diff/f9efa43a

Branch: refs/heads/master
Commit: f9efa43acb477ffba989ca78c5e66d9e93f4b68f
Parents: 59d1877
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Dec 9 10:14:49 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Dec 9 10:14:49 2014 -0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 bin/grid                                        |   8 +-
 pom.xml                                         | 195 ++++++-----
 samza-job-package/pom.xml                       | 121 -------
 samza-job-package/src/main/assembly/src.xml     |  80 -----
 .../src/main/config/wikipedia-feed.properties   |  44 ---
 .../src/main/config/wikipedia-parser.properties |  52 ---
 .../src/main/config/wikipedia-stats.properties  |  53 ---
 samza-job-package/src/main/resources/log4j.xml  |  36 --
 samza-wikipedia/pom.xml                         |  65 ----
 .../wikipedia/system/WikipediaConsumer.java     |  77 -----
 .../wikipedia/system/WikipediaFeed.java         | 332 -------------------
 .../system/WikipediaSystemFactory.java          |  50 ---
 .../wikipedia/task/WikipediaFeedStreamTask.java |  43 ---
 .../task/WikipediaParserStreamTask.java         |  98 ------
 .../task/WikipediaStatsStreamTask.java          |  92 -----
 src/main/assembly/src.xml                       |  81 +++++
 src/main/config/wikipedia-feed.properties       |  44 +++
 src/main/config/wikipedia-parser.properties     |  52 +++
 src/main/config/wikipedia-stats.properties      |  57 ++++
 .../wikipedia/system/WikipediaConsumer.java     |  77 +++++
 .../wikipedia/system/WikipediaFeed.java         | 332 +++++++++++++++++++
 .../system/WikipediaSystemFactory.java          |  50 +++
 .../wikipedia/task/WikipediaFeedStreamTask.java |  43 +++
 .../task/WikipediaParserStreamTask.java         |  98 ++++++
 .../task/WikipediaStatsStreamTask.java          |  92 +++++
 src/main/resources/log4j.xml                    |  39 +++
 27 files changed, 1077 insertions(+), 1235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1898309..0435c14 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ target/
 *.iws
 */.cache
 deploy
+*.swp

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 4324c92..25b2ec4 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,8 +35,8 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
-DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
+DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
 bootstrap() {
@@ -63,7 +63,7 @@ install_zookeeper() {
 
 install_yarn() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install yarn $DOWNLOAD_YARN hadoop-2.2.0
+  install yarn $DOWNLOAD_YARN hadoop-2.4.0
   cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml"
   if [ ! -f "$HOME/.samza/conf/yarn-site.xml" ]; then
     mkdir -p "$HOME/.samza/conf"
@@ -73,7 +73,7 @@ install_yarn() {
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.9.2-0.8.1.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.10-0.8.1.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90f6c03..0891177 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,99 +25,102 @@ under the License.
     <maven>3.0.0</maven>
   </prerequisites>
 
-  <groupId>samza</groupId>
-  <artifactId>samza-example-parent</artifactId>
-  <version>0.7.0</version>
-  <packaging>pom</packaging>
-  <name>Samza Parent</name>
+  <groupId>org.apache.samza</groupId>
+  <artifactId>hello-samza</artifactId>
+  <version>0.8.0</version>
+  <packaging>jar</packaging>
+  <name>Samza Example</name>
   <description>
     Samza is a stream processing system. Think of it as Map-Reduce for streams.
   </description>
-  <url>https://github.com/linkedin/hello-samza</url>
+  <url>https://samza.incubator.apache.org/</url>
 
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>samza</groupId>
-        <artifactId>samza-wikipedia</artifactId>
-        <version>0.7.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-api</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-core_2.10</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-serializers_2.10</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-shell</artifactId>
-        <classifier>dist</classifier>
-        <type>tgz</type>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-yarn_2.10</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-kv_2.10</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.samza</groupId>
-        <artifactId>samza-kafka_2.10</artifactId>
-        <version>${samza.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.kafka</groupId>
-        <artifactId>kafka_2.10</artifactId>
-        <version>0.8.1</version>
-      </dependency>
-      <dependency>
-        <groupId>org.schwering</groupId>
-        <artifactId>irclib</artifactId>
-        <version>1.10</version>
-      </dependency>
-      <dependency>
-        <groupId>org.slf4j</groupId>
-        <artifactId>slf4j-api</artifactId>
-        <version>1.6.2</version>
-      </dependency>
-      <dependency>
-        <groupId>org.slf4j</groupId>
-        <artifactId>slf4j-log4j12</artifactId>
-        <version>1.6.2</version>
-      </dependency>
-      <dependency>
-        <groupId>org.codehaus.jackson</groupId>
-        <artifactId>jackson-jaxrs</artifactId>
-        <version>1.8.5</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-api</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-core_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-log4j</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-serializers_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-shell</artifactId>
+      <classifier>dist</classifier>
+      <type>tgz</type>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-yarn_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-kv_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-kv-rocksdb_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-kafka_2.10</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>0.8.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.schwering</groupId>
+      <artifactId>irclib</artifactId>
+      <version>1.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.6.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.6.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-jaxrs</artifactId>
+      <version>1.8.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+  </dependencies>
 
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.7.0</samza.version>
+    <samza.version>0.8.0</samza.version>
   </properties>
 
-  <modules>
-    <module>samza-job-package</module>
-    <module>samza-wikipedia</module>
-  </modules>
-
   <developers>
     <developer>
       <name>Chris Riccomini</name>
@@ -159,11 +162,6 @@ under the License.
       <name>Scala-tools Maven2 Repository</name>
       <url>https://oss.sonatype.org/content/groups/scala-tools</url>
     </repository>
-    <!-- for zkclient -->
-    <repository>
-      <id>sonatype</id>
-      <url>http://oss.sonatype.org/content/groups/public</url>
-    </repository>
   </repositories>
 
   <pluginRepositories>
@@ -183,6 +181,7 @@ under the License.
           <version>0.9</version>
           <configuration>
             <excludes>
+              <exclude>**/target/**</exclude>
               <exclude>*.json</exclude>
               <exclude>.vagrant/**</exclude>
               <exclude>.git/**</exclude>
@@ -193,6 +192,7 @@ under the License.
               <exclude>.gitignore</exclude>
               <exclude>**/.cache/**</exclude>
               <exclude>deploy/**</exclude>
+              <exclude>**/.project</exclude>
             </excludes>
           </configuration>
         </plugin>
@@ -221,6 +221,25 @@ under the License.
           </execution>
         </executions>
       </plugin>
+      <!-- plugin to build the tar.gz file filled with examples -->
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.3</version>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/src.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/pom.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/pom.xml b/samza-job-package/pom.xml
deleted file mode 100644
index 169a28f..0000000
--- a/samza-job-package/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>samza</groupId>
-    <artifactId>samza-example-parent</artifactId>
-    <version>0.7.0</version>
-  </parent>
-
-  <artifactId>samza-job-package</artifactId>
-  <name>Samza Job Package</name>
-  <packaging>jar</packaging>
-
-  <dependencies>
-    <dependency>
-      <groupId>samza</groupId>
-      <artifactId>samza-wikipedia</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-shell</artifactId>
-      <classifier>dist</classifier>
-      <type>tgz</type>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-core_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-serializers_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-yarn_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-kv_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-kafka_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <version>2.2.0</version>
-      <scope>runtime</scope>
-    </dependency>
-  </dependencies>
-
-  <licenses>
-    <license>
-      <name>Apache License 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-  <build>
-    <plugins>
-      <!-- plugin to build the tar.gz file filled with examples -->
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.3</version>
-        <configuration>
-          <descriptors>
-            <descriptor>src/main/assembly/src.xml</descriptor>
-          </descriptors>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/assembly/src.xml b/samza-job-package/src/main/assembly/src.xml
deleted file mode 100644
index 14a5ad5..0000000
--- a/samza-job-package/src/main/assembly/src.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-  license agreements. See the NOTICE file distributed with this work for additional 
-  information regarding copyright ownership. The ASF licenses this file to 
-  you under the Apache License, Version 2.0 (the "License"); you may not use 
-  this file except in compliance with the License. You may obtain a copy of 
-  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-  by applicable law or agreed to in writing, software distributed under the 
-  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-  OF ANY KIND, either express or implied. See the License for the specific 
-  language governing permissions and limitations under the License. -->
-
-<assembly
-  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>dist</id>
-  <formats>
-    <format>tar.gz</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/..</directory>
-      <includes>
-        <include>README*</include>
-        <include>LICENSE*</include>
-        <include>NOTICE*</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-  <files>
-    <file>
-      <source>${basedir}/src/main/resources/log4j.xml</source>
-      <outputDirectory>lib</outputDirectory>
-    </file>
-    <!-- filtered=true, so we do variable expansion so the yarn package path 
-      always points to the correct spot on any machine -->
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-    <file>
-      <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
-      <outputDirectory>config</outputDirectory>
-      <filtered>true</filtered>
-    </file>
-  </files>
-  <dependencySets>
-    <dependencySet>
-      <outputDirectory>bin</outputDirectory>
-      <includes>
-        <include>org.apache.samza:samza-shell:tgz:dist:*</include>
-      </includes>
-      <fileMode>0744</fileMode>
-      <unpack>true</unpack>
-    </dependencySet>
-    <dependencySet>
-      <outputDirectory>lib</outputDirectory>
-      <includes>
-        <include>org.apache.samza:samza-core_2.10</include>
-        <include>org.apache.samza:samza-kafka_2.10</include>
-        <include>org.apache.samza:samza-serializers_2.10</include>
-        <include>org.apache.samza:samza-yarn_2.10</include>
-        <include>org.apache.samza:samza-kv_2.10</include>
-        <include>org.slf4j:slf4j-log4j12</include>
-        <include>samza:samza-wikipedia</include>
-        <include>org.apache.kafka:kafka_2.10</include>
-        <include>org.apache.hadoop:hadoop-hdfs</include>
-      </includes>
-      <useTransitiveFiltering>true</useTransitiveFiltering>
-    </dependencySet>
-  </dependencySets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-feed.properties b/samza-job-package/src/main/config/wikipedia-feed.properties
deleted file mode 100644
index c498c16..0000000
--- a/samza-job-package/src/main/config/wikipedia-feed.properties
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-feed
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
-task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-
-# Wikipedia System
-systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
-systems.wikipedia.host=irc.wikimedia.org
-systems.wikipedia.port=6667
-
-# Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-parser.properties b/samza-job-package/src/main/config/wikipedia-parser.properties
deleted file mode 100644
index 38575b6..0000000
--- a/samza-job-package/src/main/config/wikipedia-parser.properties
+++ /dev/null
@@ -1,52 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-parser
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
-task.inputs=kafka.wikipedia-raw
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-# Normally, this would be 3, but we have only one broker.
-task.checkpoint.replication.factor=1
-
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
-systems.kafka.streams.metrics.samza.msg.serde=metrics

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-stats.properties b/samza-job-package/src/main/config/wikipedia-stats.properties
deleted file mode 100644
index be0c749..0000000
--- a/samza-job-package/src/main/config/wikipedia-stats.properties
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-stats
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
-task.inputs=kafka.wikipedia-edits
-task.window.ms=10000
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
-
-# Key-value storage
-stores.wikipedia-stats.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
-stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
-stores.wikipedia-stats.key.serde=string
-stores.wikipedia-stats.msg.serde=integer
-
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-stores.wikipedia-stats.write.batch.size=0
-stores.wikipedia-stats.object.cache.size=0

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/resources/log4j.xml b/samza-job-package/src/main/resources/log4j.xml
deleted file mode 100644
index a937165..0000000
--- a/samza-job-package/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
-     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
-     <param name="DatePattern" value="'.'yyyy-MM-dd" />
-     <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
-     </layout>
-  </appender>
-  <root>
-    <priority value="info" />
-    <appender-ref ref="RollingAppender"/>
-  </root>
-</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/pom.xml
----------------------------------------------------------------------
diff --git a/samza-wikipedia/pom.xml b/samza-wikipedia/pom.xml
deleted file mode 100644
index 20d94ed..0000000
--- a/samza-wikipedia/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>samza</groupId>
-    <artifactId>samza-example-parent</artifactId>
-    <version>0.7.0</version>
-  </parent>
-
-  <artifactId>samza-wikipedia</artifactId>
-  <name>Samza Wikipedia Example</name>
-  <packaging>jar</packaging>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.schwering</groupId>
-      <artifactId>irclib</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.samza</groupId>
-      <artifactId>samza-kv_2.10</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-jaxrs</artifactId>
-    </dependency>
-  </dependencies>
-
-  <licenses>
-    <license>
-      <name>Apache License 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
deleted file mode 100644
index f156c3b..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.samza.Partition;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener;
-
-public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener {
-  private final List<String> channels;
-  private final String systemName;
-  private final WikipediaFeed feed;
-
-  public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) {
-    this.channels = new ArrayList<String>();
-    this.systemName = systemName;
-    this.feed = feed;
-  }
-
-  public void onEvent(final WikipediaFeedEvent event) {
-    SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0));
-
-    try {
-      put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event));
-    } catch (Exception e) {
-      System.err.println(e);
-    }
-  }
-
-  @Override
-  public void register(SystemStreamPartition systemStreamPartition, String startingOffset) {
-    super.register(systemStreamPartition, startingOffset);
-
-    channels.add(systemStreamPartition.getStream());
-  }
-
-  @Override
-  public void start() {
-    feed.start();
-
-    for (String channel : channels) {
-      feed.listen(channel, this);
-    }
-  }
-
-  @Override
-  public void stop() {
-    for (String channel : channels) {
-      feed.unlisten(channel, this);
-    }
-
-    feed.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
deleted file mode 100644
index 16e302e..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.samza.SamzaException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.schwering.irc.lib.IRCConnection;
-import org.schwering.irc.lib.IRCEventListener;
-import org.schwering.irc.lib.IRCModeParser;
-import org.schwering.irc.lib.IRCUser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WikipediaFeed {
-  private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class);
-  private static final Random random = new Random();
-  private static final ObjectMapper jsonMapper = new ObjectMapper();
-
-  private final Map<String, Set<WikipediaFeedListener>> channelListeners;
-  private final String host;
-  private final int port;
-  private final IRCConnection conn;
-  private final String nick;
-
-  public WikipediaFeed(String host, int port) {
-    this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>();
-    this.host = host;
-    this.port = port;
-    this.nick = "samza-bot-" + Math.abs(random.nextInt());
-    this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick);
-    this.conn.addIRCEventListener(new WikipediaFeedIrcListener());
-    this.conn.setEncoding("UTF-8");
-    this.conn.setPong(true);
-    this.conn.setColors(false);
-  }
-
-  public void start() {
-    try {
-      this.conn.connect();
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e);
-    }
-  }
-
-  public void stop() {
-    this.conn.interrupt();
-
-    try {
-      this.conn.join();
-    } catch (InterruptedException e) {
-      throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e);
-    }
-
-    if (this.conn.isAlive()) {
-      throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port);
-    }
-  }
-
-  public void listen(String channel, WikipediaFeedListener listener) {
-    Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
-
-    if (listeners == null) {
-      listeners = new HashSet<WikipediaFeedListener>();
-      channelListeners.put(channel, listeners);
-      join(channel);
-    }
-
-    listeners.add(listener);
-  }
-
-  public void unlisten(String channel, WikipediaFeedListener listener) {
-    Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
-
-    if (listeners == null) {
-      throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it.");
-    } else if (!listeners.contains(listener)) {
-      throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to.");
-    }
-
-    listeners.remove(listener);
-
-    if (listeners.size() == 0) {
-      leave(channel);
-    }
-  }
-
-  public void join(String channel) {
-    conn.send("JOIN " + channel);
-  }
-
-  public void leave(String channel) {
-    conn.send("PART " + channel);
-  }
-
-  public class WikipediaFeedIrcListener implements IRCEventListener {
-    public void onRegistered() {
-      log.info("Connected");
-    }
-
-    public void onDisconnected() {
-      log.info("Disconnected");
-    }
-
-    public void onError(String msg) {
-      log.info("Error: " + msg);
-    }
-
-    public void onError(int num, String msg) {
-      log.info("Error #" + num + ": " + msg);
-    }
-
-    public void onInvite(String chan, IRCUser u, String nickPass) {
-      log.info(chan + "> " + u.getNick() + " invites " + nickPass);
-    }
-
-    public void onJoin(String chan, IRCUser u) {
-      log.info(chan + "> " + u.getNick() + " joins");
-    }
-
-    public void onKick(String chan, IRCUser u, String nickPass, String msg) {
-      log.info(chan + "> " + u.getNick() + " kicks " + nickPass);
-    }
-
-    public void onMode(IRCUser u, String nickPass, String mode) {
-      log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass);
-    }
-
-    public void onMode(String chan, IRCUser u, IRCModeParser mp) {
-      log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine());
-    }
-
-    public void onNick(IRCUser u, String nickNew) {
-      log.info("Nick: " + u.getNick() + " is now known as " + nickNew);
-    }
-
-    public void onNotice(String target, IRCUser u, String msg) {
-      log.info(target + "> " + u.getNick() + " (notice): " + msg);
-    }
-
-    public void onPart(String chan, IRCUser u, String msg) {
-      log.info(chan + "> " + u.getNick() + " parts");
-    }
-
-    public void onPrivmsg(String chan, IRCUser u, String msg) {
-      Set<WikipediaFeedListener> listeners = channelListeners.get(chan);
-
-      if (listeners != null) {
-        WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg);
-
-        for (WikipediaFeedListener listener : listeners) {
-          listener.onEvent(event);
-        }
-      }
-
-      log.debug(chan + "> " + u.getNick() + ": " + msg);
-    }
-
-    public void onQuit(IRCUser u, String msg) {
-      log.info("Quit: " + u.getNick());
-    }
-
-    public void onReply(int num, String value, String msg) {
-      log.info("Reply #" + num + ": " + value + " " + msg);
-    }
-
-    public void onTopic(String chan, IRCUser u, String topic) {
-      log.info(chan + "> " + u.getNick() + " changes topic into: " + topic);
-    }
-
-    public void onPing(String p) {
-    }
-
-    public void unknown(String a, String b, String c, String d) {
-      log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d);
-    }
-  }
-
-  public static interface WikipediaFeedListener {
-    void onEvent(WikipediaFeedEvent event);
-  }
-
-  public static final class WikipediaFeedEvent {
-    private final long time;
-    private final String channel;
-    private final String source;
-    private final String rawEvent;
-
-    public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) {
-      this.time = time;
-      this.channel = channel;
-      this.source = source;
-      this.rawEvent = rawEvent;
-    }
-
-    public WikipediaFeedEvent(Map<String, Object> jsonObject) {
-      this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw"));
-    }
-
-    public long getTime() {
-      return time;
-    }
-
-    public String getChannel() {
-      return channel;
-    }
-
-    public String getSource() {
-      return source;
-    }
-
-    public String getRawEvent() {
-      return rawEvent;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((channel == null) ? 0 : channel.hashCode());
-      result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode());
-      result = prime * result + ((source == null) ? 0 : source.hashCode());
-      result = prime * result + (int) (time ^ (time >>> 32));
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      WikipediaFeedEvent other = (WikipediaFeedEvent) obj;
-      if (channel == null) {
-        if (other.channel != null)
-          return false;
-      } else if (!channel.equals(other.channel))
-        return false;
-      if (rawEvent == null) {
-        if (other.rawEvent != null)
-          return false;
-      } else if (!rawEvent.equals(other.rawEvent))
-        return false;
-      if (source == null) {
-        if (other.source != null)
-          return false;
-      } else if (!source.equals(other.source))
-        return false;
-      if (time != other.time)
-        return false;
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]";
-    }
-
-    public String toJson() {
-      return toJson(this);
-    }
-
-    public static Map<String, Object> toMap(WikipediaFeedEvent event) {
-      Map<String, Object> jsonObject = new HashMap<String, Object>();
-
-      jsonObject.put("time", event.getTime());
-      jsonObject.put("channel", event.getChannel());
-      jsonObject.put("source", event.getSource());
-      jsonObject.put("raw", event.getRawEvent());
-
-      return jsonObject;
-    }
-
-    public static String toJson(WikipediaFeedEvent event) {
-      Map<String, Object> jsonObject = toMap(event);
-
-      try {
-        return jsonMapper.writeValueAsString(jsonObject);
-      } catch (Exception e) {
-        throw new SamzaException(e);
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    public static WikipediaFeedEvent fromJson(String json) {
-      try {
-        return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class));
-      } catch (Exception e) {
-        throw new SamzaException(e);
-      }
-    }
-  }
-
-  public static void main(String[] args) throws InterruptedException {
-    WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667);
-    feed.start();
-
-    feed.listen("#en.wikipedia", new WikipediaFeedListener() {
-      @Override
-      public void onEvent(WikipediaFeedEvent event) {
-        System.out.println(event);
-      }
-    });
-
-    Thread.sleep(20000);
-    feed.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
deleted file mode 100644
index d1612c9..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-
-public class WikipediaSystemFactory implements SystemFactory {
-  @Override
-  public SystemAdmin getAdmin(String systemName, Config config) {
-    return new SinglePartitionWithoutOffsetsSystemAdmin();
-  }
-
-  @Override
-  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
-    String host = config.get("systems." + systemName + ".host");
-    int port = config.getInt("systems." + systemName + ".port");
-    WikipediaFeed feed = new WikipediaFeed(host, port);
-
-    return new WikipediaConsumer(systemName, feed, registry);
-  }
-
-  @Override
-  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
-    throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
deleted file mode 100644
index 07cd8ac..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.Map;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskCoordinator;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-
-/**
- * This task is very simple. All it does is take messages that it receives, and
- * sends them to a Kafka topic called wikipedia-raw.
- */
-public class WikipediaFeedStreamTask implements StreamTask {
-  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw");
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-    Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());
-    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
deleted file mode 100644
index 0505f58..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskCoordinator;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-
-public class WikipediaParserStreamTask implements StreamTask {
-  @SuppressWarnings("unchecked")
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-    Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
-    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
-
-    try {
-      Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
-
-      parsedJsonObject.put("channel", event.getChannel());
-      parsedJsonObject.put("source", event.getSource());
-      parsedJsonObject.put("time", event.getTime());
-
-      collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
-    } catch (Exception e) {
-      System.err.println("Unable to parse line: " + event);
-    }
-  }
-
-  public static Map<String, Object> parse(String line) {
-    Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
-    Matcher m = p.matcher(line);
-
-    if (m.find() && m.groupCount() == 6) {
-      String title = m.group(1);
-      String flags = m.group(2);
-      String diffUrl = m.group(3);
-      String user = m.group(4);
-      int byteDiff = Integer.parseInt(m.group(5));
-      String summary = m.group(6);
-
-      Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
-
-      flagMap.put("is-minor", flags.contains("M"));
-      flagMap.put("is-new", flags.contains("N"));
-      flagMap.put("is-unpatrolled", flags.contains("!"));
-      flagMap.put("is-bot-edit", flags.contains("B"));
-      flagMap.put("is-special", title.startsWith("Special:"));
-      flagMap.put("is-talk", title.startsWith("Talk:"));
-
-      Map<String, Object> root = new HashMap<String, Object>();
-
-      root.put("title", title);
-      root.put("user", user);
-      root.put("unparsed-flags", flags);
-      root.put("diff-bytes", byteDiff);
-      root.put("diff-url", diffUrl);
-      root.put("summary", summary);
-      root.put("flags", flagMap);
-
-      return root;
-    } else {
-      throw new IllegalArgumentException();
-    }
-  }
-
-  public static void main(String[] args) {
-    String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]]  http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */  Added to note regarding David Shepard's brothers" };
-
-    for (String line : lines) {
-      System.out.println(parse(line));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
deleted file mode 100644
index 60fd93d..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
-  private int edits = 0;
-  private int byteDiff = 0;
-  private Set<String> titles = new HashSet<String>();
-  private Map<String, Integer> counts = new HashMap<String, Integer>();
-  private KeyValueStore<String, Integer> store;
-
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-    Map<String, Object> edit = (Map<String, Object>) envelope.getMessage();
-    Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
-
-    Integer editsAllTime = store.get("count-edits-all-time");
-    if (editsAllTime == null) editsAllTime = 0;
-    store.put("count-edits-all-time", editsAllTime + 1);
-
-    edits += 1;
-    titles.add((String) edit.get("title"));
-    byteDiff += (Integer) edit.get("diff-bytes");
-
-    for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
-      if (Boolean.TRUE.equals(flag.getValue())) {
-        Integer count = counts.get(flag.getKey());
-
-        if (count == null) {
-          count = 0;
-        }
-
-        count += 1;
-        counts.put(flag.getKey(), count);
-      }
-    }
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) {
-    counts.put("edits", edits);
-    counts.put("bytes-added", byteDiff);
-    counts.put("unique-titles", titles.size());
-    counts.put("edits-all-time", store.get("count-edits-all-time"));
-
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
-
-    // Reset counts after windowing.
-    edits = 0;
-    byteDiff = 0;
-    titles = new HashSet<String>();
-    counts = new HashMap<String, Integer>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
new file mode 100644
index 0000000..8a8556d
--- /dev/null
+++ b/src/main/assembly/src.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>dist</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}</directory>
+      <includes>
+        <include>README*</include>
+        <include>LICENSE*</include>
+        <include>NOTICE*</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+  <files>
+    <file>
+      <source>${basedir}/src/main/resources/log4j.xml</source>
+      <outputDirectory>lib</outputDirectory>
+    </file>
+    <!-- filtered=true, so we do variable expansion so the yarn package path
+      always points to the correct spot on any machine -->
+    <file>
+      <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+    <file>
+      <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+    <file>
+      <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
+      <outputDirectory>config</outputDirectory>
+      <filtered>true</filtered>
+    </file>
+  </files>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>bin</outputDirectory>
+      <includes>
+        <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+      </includes>
+      <fileMode>0744</fileMode>
+      <unpack>true</unpack>
+    </dependencySet>
+    <dependencySet>
+      <outputDirectory>lib</outputDirectory>
+      <includes>
+        <include>org.apache.samza:samza-core_2.10</include>
+        <include>org.apache.samza:samza-kafka_2.10</include>
+        <include>org.apache.samza:samza-serializers_2.10</include>
+        <include>org.apache.samza:samza-yarn_2.10</include>
+        <include>org.apache.samza:samza-kv-rocksdb_2.10</include>
+        <include>org.apache.samza:samza-log4j</include>
+        <include>org.apache.samza:hello-samza</include>
+        <include>org.slf4j:slf4j-log4j12</include>
+        <include>org.apache.kafka:kafka_2.10</include>
+        <include>org.apache.hadoop:hadoop-hdfs</include>
+      </includes>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties
new file mode 100644
index 0000000..c498c16
--- /dev/null
+++ b/src/main/config/wikipedia-feed.properties
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-feed
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
+task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
new file mode 100644
index 0000000..38575b6
--- /dev/null
+++ b/src/main/config/wikipedia-parser.properties
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-parser
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
+task.inputs=kafka.wikipedia-raw
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka
+# Normally, this would be 3, but we have only one broker.
+task.checkpoint.replication.factor=1
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
+
+# Systems
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.auto.offset.reset=largest
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1
+systems.kafka.streams.metrics.samza.msg.serde=metrics

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
new file mode 100644
index 0000000..69eff90
--- /dev/null
+++ b/src/main/config/wikipedia-stats.properties
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-stats
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
+task.inputs=kafka.wikipedia-edits
+task.window.ms=10000
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka
+# Normally, this would be 3, but we have only one broker.
+task.checkpoint.replication.factor=1
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Systems
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.auto.offset.reset=largest
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+stores.wikipedia-stats.write.batch.size=0
+stores.wikipedia-stats.object.cache.size=0

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
new file mode 100644
index 0000000..f156c3b
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.Partition;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener;
+
+public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener {
+  private final List<String> channels;
+  private final String systemName;
+  private final WikipediaFeed feed;
+
+  public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) {
+    this.channels = new ArrayList<String>();
+    this.systemName = systemName;
+    this.feed = feed;
+  }
+
+  public void onEvent(final WikipediaFeedEvent event) {
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0));
+
+    try {
+      put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event));
+    } catch (Exception e) {
+      System.err.println(e);
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String startingOffset) {
+    super.register(systemStreamPartition, startingOffset);
+
+    channels.add(systemStreamPartition.getStream());
+  }
+
+  @Override
+  public void start() {
+    feed.start();
+
+    for (String channel : channels) {
+      feed.listen(channel, this);
+    }
+  }
+
+  @Override
+  public void stop() {
+    for (String channel : channels) {
+      feed.unlisten(channel, this);
+    }
+
+    feed.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
new file mode 100644
index 0000000..16e302e
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.schwering.irc.lib.IRCConnection;
+import org.schwering.irc.lib.IRCEventListener;
+import org.schwering.irc.lib.IRCModeParser;
+import org.schwering.irc.lib.IRCUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WikipediaFeed {
+  private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class);
+  private static final Random random = new Random();
+  private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+  private final Map<String, Set<WikipediaFeedListener>> channelListeners;
+  private final String host;
+  private final int port;
+  private final IRCConnection conn;
+  private final String nick;
+
+  public WikipediaFeed(String host, int port) {
+    this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>();
+    this.host = host;
+    this.port = port;
+    this.nick = "samza-bot-" + Math.abs(random.nextInt());
+    this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick);
+    this.conn.addIRCEventListener(new WikipediaFeedIrcListener());
+    this.conn.setEncoding("UTF-8");
+    this.conn.setPong(true);
+    this.conn.setColors(false);
+  }
+
+  public void start() {
+    try {
+      this.conn.connect();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e);
+    }
+  }
+
+  public void stop() {
+    this.conn.interrupt();
+
+    try {
+      this.conn.join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e);
+    }
+
+    if (this.conn.isAlive()) {
+      throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port);
+    }
+  }
+
+  public void listen(String channel, WikipediaFeedListener listener) {
+    Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
+
+    if (listeners == null) {
+      listeners = new HashSet<WikipediaFeedListener>();
+      channelListeners.put(channel, listeners);
+      join(channel);
+    }
+
+    listeners.add(listener);
+  }
+
+  public void unlisten(String channel, WikipediaFeedListener listener) {
+    Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
+
+    if (listeners == null) {
+      throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it.");
+    } else if (!listeners.contains(listener)) {
+      throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to.");
+    }
+
+    listeners.remove(listener);
+
+    if (listeners.size() == 0) {
+      leave(channel);
+    }
+  }
+
+  public void join(String channel) {
+    conn.send("JOIN " + channel);
+  }
+
+  public void leave(String channel) {
+    conn.send("PART " + channel);
+  }
+
+  public class WikipediaFeedIrcListener implements IRCEventListener {
+    public void onRegistered() {
+      log.info("Connected");
+    }
+
+    public void onDisconnected() {
+      log.info("Disconnected");
+    }
+
+    public void onError(String msg) {
+      log.info("Error: " + msg);
+    }
+
+    public void onError(int num, String msg) {
+      log.info("Error #" + num + ": " + msg);
+    }
+
+    public void onInvite(String chan, IRCUser u, String nickPass) {
+      log.info(chan + "> " + u.getNick() + " invites " + nickPass);
+    }
+
+    public void onJoin(String chan, IRCUser u) {
+      log.info(chan + "> " + u.getNick() + " joins");
+    }
+
+    public void onKick(String chan, IRCUser u, String nickPass, String msg) {
+      log.info(chan + "> " + u.getNick() + " kicks " + nickPass);
+    }
+
+    public void onMode(IRCUser u, String nickPass, String mode) {
+      log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass);
+    }
+
+    public void onMode(String chan, IRCUser u, IRCModeParser mp) {
+      log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine());
+    }
+
+    public void onNick(IRCUser u, String nickNew) {
+      log.info("Nick: " + u.getNick() + " is now known as " + nickNew);
+    }
+
+    public void onNotice(String target, IRCUser u, String msg) {
+      log.info(target + "> " + u.getNick() + " (notice): " + msg);
+    }
+
+    public void onPart(String chan, IRCUser u, String msg) {
+      log.info(chan + "> " + u.getNick() + " parts");
+    }
+
+    public void onPrivmsg(String chan, IRCUser u, String msg) {
+      Set<WikipediaFeedListener> listeners = channelListeners.get(chan);
+
+      if (listeners != null) {
+        WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg);
+
+        for (WikipediaFeedListener listener : listeners) {
+          listener.onEvent(event);
+        }
+      }
+
+      log.debug(chan + "> " + u.getNick() + ": " + msg);
+    }
+
+    public void onQuit(IRCUser u, String msg) {
+      log.info("Quit: " + u.getNick());
+    }
+
+    public void onReply(int num, String value, String msg) {
+      log.info("Reply #" + num + ": " + value + " " + msg);
+    }
+
+    public void onTopic(String chan, IRCUser u, String topic) {
+      log.info(chan + "> " + u.getNick() + " changes topic into: " + topic);
+    }
+
+    public void onPing(String p) {
+    }
+
+    public void unknown(String a, String b, String c, String d) {
+      log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d);
+    }
+  }
+
+  public static interface WikipediaFeedListener {
+    void onEvent(WikipediaFeedEvent event);
+  }
+
+  public static final class WikipediaFeedEvent {
+    private final long time;
+    private final String channel;
+    private final String source;
+    private final String rawEvent;
+
+    public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) {
+      this.time = time;
+      this.channel = channel;
+      this.source = source;
+      this.rawEvent = rawEvent;
+    }
+
+    public WikipediaFeedEvent(Map<String, Object> jsonObject) {
+      this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw"));
+    }
+
+    public long getTime() {
+      return time;
+    }
+
+    public String getChannel() {
+      return channel;
+    }
+
+    public String getSource() {
+      return source;
+    }
+
+    public String getRawEvent() {
+      return rawEvent;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+      result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode());
+      result = prime * result + ((source == null) ? 0 : source.hashCode());
+      result = prime * result + (int) (time ^ (time >>> 32));
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      WikipediaFeedEvent other = (WikipediaFeedEvent) obj;
+      if (channel == null) {
+        if (other.channel != null)
+          return false;
+      } else if (!channel.equals(other.channel))
+        return false;
+      if (rawEvent == null) {
+        if (other.rawEvent != null)
+          return false;
+      } else if (!rawEvent.equals(other.rawEvent))
+        return false;
+      if (source == null) {
+        if (other.source != null)
+          return false;
+      } else if (!source.equals(other.source))
+        return false;
+      if (time != other.time)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]";
+    }
+
+    public String toJson() {
+      return toJson(this);
+    }
+
+    public static Map<String, Object> toMap(WikipediaFeedEvent event) {
+      Map<String, Object> jsonObject = new HashMap<String, Object>();
+
+      jsonObject.put("time", event.getTime());
+      jsonObject.put("channel", event.getChannel());
+      jsonObject.put("source", event.getSource());
+      jsonObject.put("raw", event.getRawEvent());
+
+      return jsonObject;
+    }
+
+    public static String toJson(WikipediaFeedEvent event) {
+      Map<String, Object> jsonObject = toMap(event);
+
+      try {
+        return jsonMapper.writeValueAsString(jsonObject);
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static WikipediaFeedEvent fromJson(String json) {
+      try {
+        return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class));
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667);
+    feed.start();
+
+    feed.listen("#en.wikipedia", new WikipediaFeedListener() {
+      @Override
+      public void onEvent(WikipediaFeedEvent event) {
+        System.out.println(event);
+      }
+    });
+
+    Thread.sleep(20000);
+    feed.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
new file mode 100644
index 0000000..d1612c9
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+
+public class WikipediaSystemFactory implements SystemFactory {
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SinglePartitionWithoutOffsetsSystemAdmin();
+  }
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    String host = config.get("systems." + systemName + ".host");
+    int port = config.getInt("systems." + systemName + ".port");
+    WikipediaFeed feed = new WikipediaFeed(host, port);
+
+    return new WikipediaConsumer(systemName, feed, registry);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?");
+  }
+}