You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/12/09 19:15:08 UTC
[1/3] incubator-samza-hello-samza git commit: SAMZA-495;
upgrade to 0.9.0 samza
Repository: incubator-samza-hello-samza
Updated Branches:
refs/heads/latest 68d9961ef -> 20a32e014
refs/heads/master 59d1877f0 -> f9efa43ac
SAMZA-495; upgrade to 0.9.0 samza
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/commit/20a32e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/tree/20a32e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/diff/20a32e01
Branch: refs/heads/latest
Commit: 20a32e014635d8dd50ac716f1bddcc5d80f1b042
Parents: 68d9961
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Dec 9 10:14:20 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Dec 9 10:14:20 2014 -0800
----------------------------------------------------------------------
pom.xml | 6 +++---
src/main/assembly/src.xml | 3 +--
src/main/config/wikipedia-stats.properties | 2 +-
3 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e1f918..ab73a07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.8.0</version>
+ <version>0.9.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -70,7 +70,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
- <artifactId>samza-kv-leveldb_2.10</artifactId>
+ <artifactId>samza-kv-rocksdb_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
@@ -113,7 +113,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.8.0-SNAPSHOT</samza.version>
+ <samza.version>0.9.0-SNAPSHOT</samza.version>
</properties>
<developers>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f9afd8..f57fee2 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -66,9 +66,8 @@
<includes>
<include>org.apache.samza:samza-core_2.10</include>
<include>org.apache.samza:samza-kafka_2.10</include>
- <include>org.apache.samza:samza-serializers_2.10</include>
<include>org.apache.samza:samza-yarn_2.10</include>
- <include>org.apache.samza:samza-kv-leveldb_2.10</include>
+ <include>org.apache.samza:samza-kv-rocksdb_2.10</include>
<include>org.apache.samza:samza-log4j</include>
<include>org.apache.samza:hello-samza</include>
<include>org.slf4j:slf4j-log4j12</include>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/20a32e01/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index d06d559..69eff90 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -47,7 +47,7 @@ systems.kafka.producer.producer.type=sync
systems.kafka.producer.batch.num.messages=1
# Key-value storage
-stores.wikipedia-stats.factory=org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
stores.wikipedia-stats.key.serde=string
stores.wikipedia-stats.msg.serde=integer
[2/3] incubator-samza-hello-samza git commit: SAMZA-495;
upgrade to 0.8.0 samza
Posted by cr...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
new file mode 100644
index 0000000..07cd8ac
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.Map;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+/**
+ * This task is very simple. All it does is take messages that it receives, and
+ * sends them to a Kafka topic called wikipedia-raw.
+ */
+public class WikipediaFeedStreamTask implements StreamTask {
+ private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw");
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
new file mode 100644
index 0000000..0505f58
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+public class WikipediaParserStreamTask implements StreamTask {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
+ WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
+
+ try {
+ Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
+
+ parsedJsonObject.put("channel", event.getChannel());
+ parsedJsonObject.put("source", event.getSource());
+ parsedJsonObject.put("time", event.getTime());
+
+ collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
+ } catch (Exception e) {
+ System.err.println("Unable to parse line: " + event);
+ }
+ }
+
+ public static Map<String, Object> parse(String line) {
+ Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+ Matcher m = p.matcher(line);
+
+ if (m.find() && m.groupCount() == 6) {
+ String title = m.group(1);
+ String flags = m.group(2);
+ String diffUrl = m.group(3);
+ String user = m.group(4);
+ int byteDiff = Integer.parseInt(m.group(5));
+ String summary = m.group(6);
+
+ Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+ flagMap.put("is-minor", flags.contains("M"));
+ flagMap.put("is-new", flags.contains("N"));
+ flagMap.put("is-unpatrolled", flags.contains("!"));
+ flagMap.put("is-bot-edit", flags.contains("B"));
+ flagMap.put("is-special", title.startsWith("Special:"));
+ flagMap.put("is-talk", title.startsWith("Talk:"));
+
+ Map<String, Object> root = new HashMap<String, Object>();
+
+ root.put("title", title);
+ root.put("user", user);
+ root.put("unparsed-flags", flags);
+ root.put("diff-bytes", byteDiff);
+ root.put("diff-url", diffUrl);
+ root.put("summary", summary);
+ root.put("flags", flagMap);
+
+ return root;
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public static void main(String[] args) {
+ String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" };
+
+ for (String line : lines) {
+ System.out.println(parse(line));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
new file mode 100644
index 0000000..60fd93d
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
+ private int edits = 0;
+ private int byteDiff = 0;
+ private Set<String> titles = new HashSet<String>();
+ private Map<String, Integer> counts = new HashMap<String, Integer>();
+ private KeyValueStore<String, Integer> store;
+
+ public void init(Config config, TaskContext context) {
+ this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ Map<String, Object> edit = (Map<String, Object>) envelope.getMessage();
+ Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+
+ Integer editsAllTime = store.get("count-edits-all-time");
+ if (editsAllTime == null) editsAllTime = 0;
+ store.put("count-edits-all-time", editsAllTime + 1);
+
+ edits += 1;
+ titles.add((String) edit.get("title"));
+ byteDiff += (Integer) edit.get("diff-bytes");
+
+ for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+ if (Boolean.TRUE.equals(flag.getValue())) {
+ Integer count = counts.get(flag.getKey());
+
+ if (count == null) {
+ count = 0;
+ }
+
+ count += 1;
+ counts.put(flag.getKey(), count);
+ }
+ }
+ }
+
+ @Override
+ public void window(MessageCollector collector, TaskCoordinator coordinator) {
+ counts.put("edits", edits);
+ counts.put("bytes-added", byteDiff);
+ counts.put("unique-titles", titles.size());
+ counts.put("edits-all-time", store.get("count-edits-all-time"));
+
+ collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
+
+ // Reset counts after windowing.
+ edits = 0;
+ byteDiff = 0;
+ titles = new HashSet<String>();
+ counts = new HashMap<String, Integer>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..f0de765
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+ <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+ <param name="DatePattern" value="'.'yyyy-MM-dd" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
+ <root>
+ <priority value="info" />
+ <appender-ref ref="RollingAppender"/>
+ <appender-ref ref="jmx" />
+ </root>
+</log4j:configuration>
[3/3] incubator-samza-hello-samza git commit: SAMZA-495;
upgrade to 0.8.0 samza
Posted by cr...@apache.org.
SAMZA-495; upgrade to 0.8.0 samza
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/commit/f9efa43a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/tree/f9efa43a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/diff/f9efa43a
Branch: refs/heads/master
Commit: f9efa43acb477ffba989ca78c5e66d9e93f4b68f
Parents: 59d1877
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Dec 9 10:14:49 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Dec 9 10:14:49 2014 -0800
----------------------------------------------------------------------
.gitignore | 1 +
bin/grid | 8 +-
pom.xml | 195 ++++++-----
samza-job-package/pom.xml | 121 -------
samza-job-package/src/main/assembly/src.xml | 80 -----
.../src/main/config/wikipedia-feed.properties | 44 ---
.../src/main/config/wikipedia-parser.properties | 52 ---
.../src/main/config/wikipedia-stats.properties | 53 ---
samza-job-package/src/main/resources/log4j.xml | 36 --
samza-wikipedia/pom.xml | 65 ----
.../wikipedia/system/WikipediaConsumer.java | 77 -----
.../wikipedia/system/WikipediaFeed.java | 332 -------------------
.../system/WikipediaSystemFactory.java | 50 ---
.../wikipedia/task/WikipediaFeedStreamTask.java | 43 ---
.../task/WikipediaParserStreamTask.java | 98 ------
.../task/WikipediaStatsStreamTask.java | 92 -----
src/main/assembly/src.xml | 81 +++++
src/main/config/wikipedia-feed.properties | 44 +++
src/main/config/wikipedia-parser.properties | 52 +++
src/main/config/wikipedia-stats.properties | 57 ++++
.../wikipedia/system/WikipediaConsumer.java | 77 +++++
.../wikipedia/system/WikipediaFeed.java | 332 +++++++++++++++++++
.../system/WikipediaSystemFactory.java | 50 +++
.../wikipedia/task/WikipediaFeedStreamTask.java | 43 +++
.../task/WikipediaParserStreamTask.java | 98 ++++++
.../task/WikipediaStatsStreamTask.java | 92 +++++
src/main/resources/log4j.xml | 39 +++
27 files changed, 1077 insertions(+), 1235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1898309..0435c14 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ target/
*.iws
*/.cache
deploy
+*.swp
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/bin/grid
----------------------------------------------------------------------
diff --git a/bin/grid b/bin/grid
index 4324c92..25b2ec4 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,8 +35,8 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
-DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.2.0/hadoop-2.2.0.tar.gz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
+DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
bootstrap() {
@@ -63,7 +63,7 @@ install_zookeeper() {
install_yarn() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install yarn $DOWNLOAD_YARN hadoop-2.2.0
+ install yarn $DOWNLOAD_YARN hadoop-2.4.0
cp "$BASE_DIR/conf/yarn-site.xml" "$DEPLOY_ROOT_DIR/yarn/etc/hadoop/yarn-site.xml"
if [ ! -f "$HOME/.samza/conf/yarn-site.xml" ]; then
mkdir -p "$HOME/.samza/conf"
@@ -73,7 +73,7 @@ install_yarn() {
install_kafka() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install kafka $DOWNLOAD_KAFKA kafka_2.9.2-0.8.1.1
+ install kafka $DOWNLOAD_KAFKA kafka_2.10-0.8.1.1
# have to use SIGTERM since nohup on appears to ignore SIGINT
# and Kafka switched to SIGINT in KAFKA-1031.
sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90f6c03..0891177 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,99 +25,102 @@ under the License.
<maven>3.0.0</maven>
</prerequisites>
- <groupId>samza</groupId>
- <artifactId>samza-example-parent</artifactId>
- <version>0.7.0</version>
- <packaging>pom</packaging>
- <name>Samza Parent</name>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>hello-samza</artifactId>
+ <version>0.8.0</version>
+ <packaging>jar</packaging>
+ <name>Samza Example</name>
<description>
Samza is a stream processing system. Think of it as Map-Reduce for streams.
</description>
- <url>https://github.com/linkedin/hello-samza</url>
+ <url>https://samza.incubator.apache.org/</url>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>samza</groupId>
- <artifactId>samza-wikipedia</artifactId>
- <version>0.7.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-api</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-core_2.10</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-serializers_2.10</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-shell</artifactId>
- <classifier>dist</classifier>
- <type>tgz</type>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-yarn_2.10</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-kv_2.10</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-kafka_2.10</artifactId>
- <version>${samza.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.schwering</groupId>
- <artifactId>irclib</artifactId>
- <version>1.10</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.6.2</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.6.2</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- <version>1.8.5</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-api</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-core_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-log4j</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-serializers_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-shell</artifactId>
+ <classifier>dist</classifier>
+ <type>tgz</type>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-yarn_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-kv_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-kv-rocksdb_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-kafka_2.10</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.schwering</groupId>
+ <artifactId>irclib</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ <version>1.8.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+ </dependencies>
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.7.0</samza.version>
+ <samza.version>0.8.0</samza.version>
</properties>
- <modules>
- <module>samza-job-package</module>
- <module>samza-wikipedia</module>
- </modules>
-
<developers>
<developer>
<name>Chris Riccomini</name>
@@ -159,11 +162,6 @@ under the License.
<name>Scala-tools Maven2 Repository</name>
<url>https://oss.sonatype.org/content/groups/scala-tools</url>
</repository>
- <!-- for zkclient -->
- <repository>
- <id>sonatype</id>
- <url>http://oss.sonatype.org/content/groups/public</url>
- </repository>
</repositories>
<pluginRepositories>
@@ -183,6 +181,7 @@ under the License.
<version>0.9</version>
<configuration>
<excludes>
+ <exclude>**/target/**</exclude>
<exclude>*.json</exclude>
<exclude>.vagrant/**</exclude>
<exclude>.git/**</exclude>
@@ -193,6 +192,7 @@ under the License.
<exclude>.gitignore</exclude>
<exclude>**/.cache/**</exclude>
<exclude>deploy/**</exclude>
+ <exclude>**/.project</exclude>
</excludes>
</configuration>
</plugin>
@@ -221,6 +221,25 @@ under the License.
</execution>
</executions>
</plugin>
+ <!-- plugin to build the tar.gz file filled with examples -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/src.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/pom.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/pom.xml b/samza-job-package/pom.xml
deleted file mode 100644
index 169a28f..0000000
--- a/samza-job-package/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>samza</groupId>
- <artifactId>samza-example-parent</artifactId>
- <version>0.7.0</version>
- </parent>
-
- <artifactId>samza-job-package</artifactId>
- <name>Samza Job Package</name>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>samza</groupId>
- <artifactId>samza-wikipedia</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-shell</artifactId>
- <classifier>dist</classifier>
- <type>tgz</type>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-core_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-serializers_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-yarn_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-kv_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-kafka_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.2.0</version>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
-
- <licenses>
- <license>
- <name>Apache License 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <build>
- <plugins>
- <!-- plugin to build the tar.gz file filled with examples -->
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.3</version>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/src.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/assembly/src.xml b/samza-job-package/src/main/assembly/src.xml
deleted file mode 100644
index 14a5ad5..0000000
--- a/samza-job-package/src/main/assembly/src.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-
-<assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>dist</id>
- <formats>
- <format>tar.gz</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/..</directory>
- <includes>
- <include>README*</include>
- <include>LICENSE*</include>
- <include>NOTICE*</include>
- </includes>
- </fileSet>
- </fileSets>
- <files>
- <file>
- <source>${basedir}/src/main/resources/log4j.xml</source>
- <outputDirectory>lib</outputDirectory>
- </file>
- <!-- filtered=true, so we do variable expansion so the yarn package path
- always points to the correct spot on any machine -->
- <file>
- <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
- <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
- <outputDirectory>config</outputDirectory>
- <filtered>true</filtered>
- </file>
- </files>
- <dependencySets>
- <dependencySet>
- <outputDirectory>bin</outputDirectory>
- <includes>
- <include>org.apache.samza:samza-shell:tgz:dist:*</include>
- </includes>
- <fileMode>0744</fileMode>
- <unpack>true</unpack>
- </dependencySet>
- <dependencySet>
- <outputDirectory>lib</outputDirectory>
- <includes>
- <include>org.apache.samza:samza-core_2.10</include>
- <include>org.apache.samza:samza-kafka_2.10</include>
- <include>org.apache.samza:samza-serializers_2.10</include>
- <include>org.apache.samza:samza-yarn_2.10</include>
- <include>org.apache.samza:samza-kv_2.10</include>
- <include>org.slf4j:slf4j-log4j12</include>
- <include>samza:samza-wikipedia</include>
- <include>org.apache.kafka:kafka_2.10</include>
- <include>org.apache.hadoop:hadoop-hdfs</include>
- </includes>
- <useTransitiveFiltering>true</useTransitiveFiltering>
- </dependencySet>
- </dependencySets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-feed.properties b/samza-job-package/src/main/config/wikipedia-feed.properties
deleted file mode 100644
index c498c16..0000000
--- a/samza-job-package/src/main/config/wikipedia-feed.properties
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-feed
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
-task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-
-# Wikipedia System
-systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
-systems.wikipedia.host=irc.wikimedia.org
-systems.wikipedia.port=6667
-
-# Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-parser.properties b/samza-job-package/src/main/config/wikipedia-parser.properties
deleted file mode 100644
index 38575b6..0000000
--- a/samza-job-package/src/main/config/wikipedia-parser.properties
+++ /dev/null
@@ -1,52 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-parser
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
-task.inputs=kafka.wikipedia-raw
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-# Normally, this would be 3, but we have only one broker.
-task.checkpoint.replication.factor=1
-
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
-systems.kafka.streams.metrics.samza.msg.serde=metrics
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/config/wikipedia-stats.properties b/samza-job-package/src/main/config/wikipedia-stats.properties
deleted file mode 100644
index be0c749..0000000
--- a/samza-job-package/src/main/config/wikipedia-stats.properties
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=wikipedia-stats
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
-task.inputs=kafka.wikipedia-edits
-task.window.ms=10000
-
-# Serializers
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-
-# Systems
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.msg.serde=json
-systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.consumer.auto.offset.reset=largest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.producer.type=sync
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
-
-# Key-value storage
-stores.wikipedia-stats.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
-stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
-stores.wikipedia-stats.key.serde=string
-stores.wikipedia-stats.msg.serde=integer
-
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-stores.wikipedia-stats.write.batch.size=0
-stores.wikipedia-stats.object.cache.size=0
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-job-package/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-job-package/src/main/resources/log4j.xml b/samza-job-package/src/main/resources/log4j.xml
deleted file mode 100644
index a937165..0000000
--- a/samza-job-package/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
- <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
- <param name="DatePattern" value="'.'yyyy-MM-dd" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
- </layout>
- </appender>
- <root>
- <priority value="info" />
- <appender-ref ref="RollingAppender"/>
- </root>
-</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/pom.xml
----------------------------------------------------------------------
diff --git a/samza-wikipedia/pom.xml b/samza-wikipedia/pom.xml
deleted file mode 100644
index 20d94ed..0000000
--- a/samza-wikipedia/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>samza</groupId>
- <artifactId>samza-example-parent</artifactId>
- <version>0.7.0</version>
- </parent>
-
- <artifactId>samza-wikipedia</artifactId>
- <name>Samza Wikipedia Example</name>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.schwering</groupId>
- <artifactId>irclib</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.samza</groupId>
- <artifactId>samza-kv_2.10</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </dependency>
- </dependencies>
-
- <licenses>
- <license>
- <name>Apache License 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
deleted file mode 100644
index f156c3b..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.samza.Partition;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener;
-
-public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener {
- private final List<String> channels;
- private final String systemName;
- private final WikipediaFeed feed;
-
- public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) {
- this.channels = new ArrayList<String>();
- this.systemName = systemName;
- this.feed = feed;
- }
-
- public void onEvent(final WikipediaFeedEvent event) {
- SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0));
-
- try {
- put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event));
- } catch (Exception e) {
- System.err.println(e);
- }
- }
-
- @Override
- public void register(SystemStreamPartition systemStreamPartition, String startingOffset) {
- super.register(systemStreamPartition, startingOffset);
-
- channels.add(systemStreamPartition.getStream());
- }
-
- @Override
- public void start() {
- feed.start();
-
- for (String channel : channels) {
- feed.listen(channel, this);
- }
- }
-
- @Override
- public void stop() {
- for (String channel : channels) {
- feed.unlisten(channel, this);
- }
-
- feed.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
deleted file mode 100644
index 16e302e..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.samza.SamzaException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.schwering.irc.lib.IRCConnection;
-import org.schwering.irc.lib.IRCEventListener;
-import org.schwering.irc.lib.IRCModeParser;
-import org.schwering.irc.lib.IRCUser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WikipediaFeed {
- private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class);
- private static final Random random = new Random();
- private static final ObjectMapper jsonMapper = new ObjectMapper();
-
- private final Map<String, Set<WikipediaFeedListener>> channelListeners;
- private final String host;
- private final int port;
- private final IRCConnection conn;
- private final String nick;
-
- public WikipediaFeed(String host, int port) {
- this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>();
- this.host = host;
- this.port = port;
- this.nick = "samza-bot-" + Math.abs(random.nextInt());
- this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick);
- this.conn.addIRCEventListener(new WikipediaFeedIrcListener());
- this.conn.setEncoding("UTF-8");
- this.conn.setPong(true);
- this.conn.setColors(false);
- }
-
- public void start() {
- try {
- this.conn.connect();
- } catch (IOException e) {
- throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e);
- }
- }
-
- public void stop() {
- this.conn.interrupt();
-
- try {
- this.conn.join();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e);
- }
-
- if (this.conn.isAlive()) {
- throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port);
- }
- }
-
- public void listen(String channel, WikipediaFeedListener listener) {
- Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
-
- if (listeners == null) {
- listeners = new HashSet<WikipediaFeedListener>();
- channelListeners.put(channel, listeners);
- join(channel);
- }
-
- listeners.add(listener);
- }
-
- public void unlisten(String channel, WikipediaFeedListener listener) {
- Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
-
- if (listeners == null) {
- throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it.");
- } else if (!listeners.contains(listener)) {
- throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to.");
- }
-
- listeners.remove(listener);
-
- if (listeners.size() == 0) {
- leave(channel);
- }
- }
-
- public void join(String channel) {
- conn.send("JOIN " + channel);
- }
-
- public void leave(String channel) {
- conn.send("PART " + channel);
- }
-
- public class WikipediaFeedIrcListener implements IRCEventListener {
- public void onRegistered() {
- log.info("Connected");
- }
-
- public void onDisconnected() {
- log.info("Disconnected");
- }
-
- public void onError(String msg) {
- log.info("Error: " + msg);
- }
-
- public void onError(int num, String msg) {
- log.info("Error #" + num + ": " + msg);
- }
-
- public void onInvite(String chan, IRCUser u, String nickPass) {
- log.info(chan + "> " + u.getNick() + " invites " + nickPass);
- }
-
- public void onJoin(String chan, IRCUser u) {
- log.info(chan + "> " + u.getNick() + " joins");
- }
-
- public void onKick(String chan, IRCUser u, String nickPass, String msg) {
- log.info(chan + "> " + u.getNick() + " kicks " + nickPass);
- }
-
- public void onMode(IRCUser u, String nickPass, String mode) {
- log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass);
- }
-
- public void onMode(String chan, IRCUser u, IRCModeParser mp) {
- log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine());
- }
-
- public void onNick(IRCUser u, String nickNew) {
- log.info("Nick: " + u.getNick() + " is now known as " + nickNew);
- }
-
- public void onNotice(String target, IRCUser u, String msg) {
- log.info(target + "> " + u.getNick() + " (notice): " + msg);
- }
-
- public void onPart(String chan, IRCUser u, String msg) {
- log.info(chan + "> " + u.getNick() + " parts");
- }
-
- public void onPrivmsg(String chan, IRCUser u, String msg) {
- Set<WikipediaFeedListener> listeners = channelListeners.get(chan);
-
- if (listeners != null) {
- WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg);
-
- for (WikipediaFeedListener listener : listeners) {
- listener.onEvent(event);
- }
- }
-
- log.debug(chan + "> " + u.getNick() + ": " + msg);
- }
-
- public void onQuit(IRCUser u, String msg) {
- log.info("Quit: " + u.getNick());
- }
-
- public void onReply(int num, String value, String msg) {
- log.info("Reply #" + num + ": " + value + " " + msg);
- }
-
- public void onTopic(String chan, IRCUser u, String topic) {
- log.info(chan + "> " + u.getNick() + " changes topic into: " + topic);
- }
-
- public void onPing(String p) {
- }
-
- public void unknown(String a, String b, String c, String d) {
- log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d);
- }
- }
-
- public static interface WikipediaFeedListener {
- void onEvent(WikipediaFeedEvent event);
- }
-
- public static final class WikipediaFeedEvent {
- private final long time;
- private final String channel;
- private final String source;
- private final String rawEvent;
-
- public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) {
- this.time = time;
- this.channel = channel;
- this.source = source;
- this.rawEvent = rawEvent;
- }
-
- public WikipediaFeedEvent(Map<String, Object> jsonObject) {
- this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw"));
- }
-
- public long getTime() {
- return time;
- }
-
- public String getChannel() {
- return channel;
- }
-
- public String getSource() {
- return source;
- }
-
- public String getRawEvent() {
- return rawEvent;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((channel == null) ? 0 : channel.hashCode());
- result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode());
- result = prime * result + ((source == null) ? 0 : source.hashCode());
- result = prime * result + (int) (time ^ (time >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- WikipediaFeedEvent other = (WikipediaFeedEvent) obj;
- if (channel == null) {
- if (other.channel != null)
- return false;
- } else if (!channel.equals(other.channel))
- return false;
- if (rawEvent == null) {
- if (other.rawEvent != null)
- return false;
- } else if (!rawEvent.equals(other.rawEvent))
- return false;
- if (source == null) {
- if (other.source != null)
- return false;
- } else if (!source.equals(other.source))
- return false;
- if (time != other.time)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]";
- }
-
- public String toJson() {
- return toJson(this);
- }
-
- public static Map<String, Object> toMap(WikipediaFeedEvent event) {
- Map<String, Object> jsonObject = new HashMap<String, Object>();
-
- jsonObject.put("time", event.getTime());
- jsonObject.put("channel", event.getChannel());
- jsonObject.put("source", event.getSource());
- jsonObject.put("raw", event.getRawEvent());
-
- return jsonObject;
- }
-
- public static String toJson(WikipediaFeedEvent event) {
- Map<String, Object> jsonObject = toMap(event);
-
- try {
- return jsonMapper.writeValueAsString(jsonObject);
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- public static WikipediaFeedEvent fromJson(String json) {
- try {
- return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class));
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667);
- feed.start();
-
- feed.listen("#en.wikipedia", new WikipediaFeedListener() {
- @Override
- public void onEvent(WikipediaFeedEvent event) {
- System.out.println(event);
- }
- });
-
- Thread.sleep(20000);
- feed.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
deleted file mode 100644
index d1612c9..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-
-public class WikipediaSystemFactory implements SystemFactory {
- @Override
- public SystemAdmin getAdmin(String systemName, Config config) {
- return new SinglePartitionWithoutOffsetsSystemAdmin();
- }
-
- @Override
- public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
- String host = config.get("systems." + systemName + ".host");
- int port = config.getInt("systems." + systemName + ".port");
- WikipediaFeed feed = new WikipediaFeed(host, port);
-
- return new WikipediaConsumer(systemName, feed, registry);
- }
-
- @Override
- public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
- throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
deleted file mode 100644
index 07cd8ac..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.Map;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskCoordinator;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-
-/**
- * This task is very simple. All it does is take messages that it receives, and
- * sends them to a Kafka topic called wikipedia-raw.
- */
-public class WikipediaFeedStreamTask implements StreamTask {
- private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw");
-
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
- Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());
- collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
deleted file mode 100644
index 0505f58..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskCoordinator;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-
-public class WikipediaParserStreamTask implements StreamTask {
- @SuppressWarnings("unchecked")
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
- Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
- WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
-
- try {
- Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
-
- parsedJsonObject.put("channel", event.getChannel());
- parsedJsonObject.put("source", event.getSource());
- parsedJsonObject.put("time", event.getTime());
-
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
- } catch (Exception e) {
- System.err.println("Unable to parse line: " + event);
- }
- }
-
- public static Map<String, Object> parse(String line) {
- Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
- Matcher m = p.matcher(line);
-
- if (m.find() && m.groupCount() == 6) {
- String title = m.group(1);
- String flags = m.group(2);
- String diffUrl = m.group(3);
- String user = m.group(4);
- int byteDiff = Integer.parseInt(m.group(5));
- String summary = m.group(6);
-
- Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
-
- flagMap.put("is-minor", flags.contains("M"));
- flagMap.put("is-new", flags.contains("N"));
- flagMap.put("is-unpatrolled", flags.contains("!"));
- flagMap.put("is-bot-edit", flags.contains("B"));
- flagMap.put("is-special", title.startsWith("Special:"));
- flagMap.put("is-talk", title.startsWith("Talk:"));
-
- Map<String, Object> root = new HashMap<String, Object>();
-
- root.put("title", title);
- root.put("user", user);
- root.put("unparsed-flags", flags);
- root.put("diff-bytes", byteDiff);
- root.put("diff-url", diffUrl);
- root.put("summary", summary);
- root.put("flags", flagMap);
-
- return root;
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- public static void main(String[] args) {
- String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" };
-
- for (String line : lines) {
- System.out.println(parse(line));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
deleted file mode 100644
index 60fd93d..0000000
--- a/samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
- private int edits = 0;
- private int byteDiff = 0;
- private Set<String> titles = new HashSet<String>();
- private Map<String, Integer> counts = new HashMap<String, Integer>();
- private KeyValueStore<String, Integer> store;
-
- public void init(Config config, TaskContext context) {
- this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
- Map<String, Object> edit = (Map<String, Object>) envelope.getMessage();
- Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
-
- Integer editsAllTime = store.get("count-edits-all-time");
- if (editsAllTime == null) editsAllTime = 0;
- store.put("count-edits-all-time", editsAllTime + 1);
-
- edits += 1;
- titles.add((String) edit.get("title"));
- byteDiff += (Integer) edit.get("diff-bytes");
-
- for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
- if (Boolean.TRUE.equals(flag.getValue())) {
- Integer count = counts.get(flag.getKey());
-
- if (count == null) {
- count = 0;
- }
-
- count += 1;
- counts.put(flag.getKey(), count);
- }
- }
- }
-
- @Override
- public void window(MessageCollector collector, TaskCoordinator coordinator) {
- counts.put("edits", edits);
- counts.put("bytes-added", byteDiff);
- counts.put("unique-titles", titles.size());
- counts.put("edits-all-time", store.get("count-edits-all-time"));
-
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
-
- // Reset counts after windowing.
- edits = 0;
- byteDiff = 0;
- titles = new HashSet<String>();
- counts = new HashMap<String, Integer>();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
new file mode 100644
index 0000000..8a8556d
--- /dev/null
+++ b/src/main/assembly/src.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>dist</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}</directory>
+ <includes>
+ <include>README*</include>
+ <include>LICENSE*</include>
+ <include>NOTICE*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <files>
+ <file>
+ <source>${basedir}/src/main/resources/log4j.xml</source>
+ <outputDirectory>lib</outputDirectory>
+ </file>
+ <!-- filtered=true, so we do variable expansion so the yarn package path
+ always points to the correct spot on any machine -->
+ <file>
+ <source>${basedir}/src/main/config/wikipedia-feed.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>${basedir}/src/main/config/wikipedia-parser.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>${basedir}/src/main/config/wikipedia-stats.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ </files>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+ </includes>
+ <fileMode>0744</fileMode>
+ <unpack>true</unpack>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>org.apache.samza:samza-core_2.10</include>
+ <include>org.apache.samza:samza-kafka_2.10</include>
+ <include>org.apache.samza:samza-serializers_2.10</include>
+ <include>org.apache.samza:samza-yarn_2.10</include>
+ <include>org.apache.samza:samza-kv-rocksdb_2.10</include>
+ <include>org.apache.samza:samza-log4j</include>
+ <include>org.apache.samza:hello-samza</include>
+ <include>org.slf4j:slf4j-log4j12</include>
+ <include>org.apache.kafka:kafka_2.10</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ </includes>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ </dependencySet>
+ </dependencySets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-feed.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties
new file mode 100644
index 0000000..c498c16
--- /dev/null
+++ b/src/main/config/wikipedia-feed.properties
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-feed
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
+task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
new file mode 100644
index 0000000..38575b6
--- /dev/null
+++ b/src/main/config/wikipedia-parser.properties
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-parser
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
+task.inputs=kafka.wikipedia-raw
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka
+# Normally, this would be 3, but we have only one broker.
+task.checkpoint.replication.factor=1
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
+
+# Systems
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.auto.offset.reset=largest
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1
+systems.kafka.streams.metrics.samza.msg.serde=metrics
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
new file mode 100644
index 0000000..69eff90
--- /dev/null
+++ b/src/main/config/wikipedia-stats.properties
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-stats
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
+task.inputs=kafka.wikipedia-edits
+task.window.ms=10000
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka
+# Normally, this would be 3, but we have only one broker.
+task.checkpoint.replication.factor=1
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Systems
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.consumer.auto.offset.reset=largest
+systems.kafka.producer.metadata.broker.list=localhost:9092
+systems.kafka.producer.producer.type=sync
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+systems.kafka.producer.batch.num.messages=1
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Normally, we'd set this much higher, but we want things to look snappy in the demo.
+stores.wikipedia-stats.write.batch.size=0
+stores.wikipedia-stats.object.cache.size=0
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
new file mode 100644
index 0000000..f156c3b
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaConsumer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.Partition;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedListener;
+
+public class WikipediaConsumer extends BlockingEnvelopeMap implements WikipediaFeedListener {
+ private final List<String> channels;
+ private final String systemName;
+ private final WikipediaFeed feed;
+
+ public WikipediaConsumer(String systemName, WikipediaFeed feed, MetricsRegistry registry) {
+ this.channels = new ArrayList<String>();
+ this.systemName = systemName;
+ this.feed = feed;
+ }
+
+ public void onEvent(final WikipediaFeedEvent event) {
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, event.getChannel(), new Partition(0));
+
+ try {
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, event));
+ } catch (Exception e) {
+ System.err.println(e);
+ }
+ }
+
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String startingOffset) {
+ super.register(systemStreamPartition, startingOffset);
+
+ channels.add(systemStreamPartition.getStream());
+ }
+
+ @Override
+ public void start() {
+ feed.start();
+
+ for (String channel : channels) {
+ feed.listen(channel, this);
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (String channel : channels) {
+ feed.unlisten(channel, this);
+ }
+
+ feed.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
new file mode 100644
index 0000000..16e302e
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaFeed.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.schwering.irc.lib.IRCConnection;
+import org.schwering.irc.lib.IRCEventListener;
+import org.schwering.irc.lib.IRCModeParser;
+import org.schwering.irc.lib.IRCUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WikipediaFeed {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaFeed.class);
+ private static final Random random = new Random();
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+ private final Map<String, Set<WikipediaFeedListener>> channelListeners;
+ private final String host;
+ private final int port;
+ private final IRCConnection conn;
+ private final String nick;
+
+ public WikipediaFeed(String host, int port) {
+ this.channelListeners = new HashMap<String, Set<WikipediaFeedListener>>();
+ this.host = host;
+ this.port = port;
+ this.nick = "samza-bot-" + Math.abs(random.nextInt());
+ this.conn = new IRCConnection(host, new int[] { port }, "", nick, nick, nick);
+ this.conn.addIRCEventListener(new WikipediaFeedIrcListener());
+ this.conn.setEncoding("UTF-8");
+ this.conn.setPong(true);
+ this.conn.setColors(false);
+ }
+
+ public void start() {
+ try {
+ this.conn.connect();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to connect to " + host + ":" + port + ".", e);
+ }
+ }
+
+ public void stop() {
+ this.conn.interrupt();
+
+ try {
+ this.conn.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while trying to shutdown IRC connection for " + host + ":" + port, e);
+ }
+
+ if (this.conn.isAlive()) {
+ throw new RuntimeException("Unable to shutdown IRC connection for " + host + ":" + port);
+ }
+ }
+
+ public void listen(String channel, WikipediaFeedListener listener) {
+ Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
+
+ if (listeners == null) {
+ listeners = new HashSet<WikipediaFeedListener>();
+ channelListeners.put(channel, listeners);
+ join(channel);
+ }
+
+ listeners.add(listener);
+ }
+
+ public void unlisten(String channel, WikipediaFeedListener listener) {
+ Set<WikipediaFeedListener> listeners = channelListeners.get(channel);
+
+ if (listeners == null) {
+ throw new RuntimeException("Trying to unlisten to a channel that has no listeners in it.");
+ } else if (!listeners.contains(listener)) {
+ throw new RuntimeException("Trying to unlisten to a channel that listener is not listening to.");
+ }
+
+ listeners.remove(listener);
+
+ if (listeners.size() == 0) {
+ leave(channel);
+ }
+ }
+
+ public void join(String channel) {
+ conn.send("JOIN " + channel);
+ }
+
+ public void leave(String channel) {
+ conn.send("PART " + channel);
+ }
+
+ public class WikipediaFeedIrcListener implements IRCEventListener {
+ public void onRegistered() {
+ log.info("Connected");
+ }
+
+ public void onDisconnected() {
+ log.info("Disconnected");
+ }
+
+ public void onError(String msg) {
+ log.info("Error: " + msg);
+ }
+
+ public void onError(int num, String msg) {
+ log.info("Error #" + num + ": " + msg);
+ }
+
+ public void onInvite(String chan, IRCUser u, String nickPass) {
+ log.info(chan + "> " + u.getNick() + " invites " + nickPass);
+ }
+
+ public void onJoin(String chan, IRCUser u) {
+ log.info(chan + "> " + u.getNick() + " joins");
+ }
+
+ public void onKick(String chan, IRCUser u, String nickPass, String msg) {
+ log.info(chan + "> " + u.getNick() + " kicks " + nickPass);
+ }
+
+ public void onMode(IRCUser u, String nickPass, String mode) {
+ log.info("Mode: " + u.getNick() + " sets modes " + mode + " " + nickPass);
+ }
+
+ public void onMode(String chan, IRCUser u, IRCModeParser mp) {
+ log.info(chan + "> " + u.getNick() + " sets mode: " + mp.getLine());
+ }
+
+ public void onNick(IRCUser u, String nickNew) {
+ log.info("Nick: " + u.getNick() + " is now known as " + nickNew);
+ }
+
+ public void onNotice(String target, IRCUser u, String msg) {
+ log.info(target + "> " + u.getNick() + " (notice): " + msg);
+ }
+
+ public void onPart(String chan, IRCUser u, String msg) {
+ log.info(chan + "> " + u.getNick() + " parts");
+ }
+
+ public void onPrivmsg(String chan, IRCUser u, String msg) {
+ Set<WikipediaFeedListener> listeners = channelListeners.get(chan);
+
+ if (listeners != null) {
+ WikipediaFeedEvent event = new WikipediaFeedEvent(System.currentTimeMillis(), chan, u.getNick(), msg);
+
+ for (WikipediaFeedListener listener : listeners) {
+ listener.onEvent(event);
+ }
+ }
+
+ log.debug(chan + "> " + u.getNick() + ": " + msg);
+ }
+
+ public void onQuit(IRCUser u, String msg) {
+ log.info("Quit: " + u.getNick());
+ }
+
+ public void onReply(int num, String value, String msg) {
+ log.info("Reply #" + num + ": " + value + " " + msg);
+ }
+
+ public void onTopic(String chan, IRCUser u, String topic) {
+ log.info(chan + "> " + u.getNick() + " changes topic into: " + topic);
+ }
+
+ public void onPing(String p) {
+ }
+
+ public void unknown(String a, String b, String c, String d) {
+ log.warn("UNKNOWN: " + a + " " + b + " " + c + " " + d);
+ }
+ }
+
+ public static interface WikipediaFeedListener {
+ void onEvent(WikipediaFeedEvent event);
+ }
+
+ public static final class WikipediaFeedEvent {
+ private final long time;
+ private final String channel;
+ private final String source;
+ private final String rawEvent;
+
+ public WikipediaFeedEvent(long time, String channel, String source, String rawEvent) {
+ this.time = time;
+ this.channel = channel;
+ this.source = source;
+ this.rawEvent = rawEvent;
+ }
+
+ public WikipediaFeedEvent(Map<String, Object> jsonObject) {
+ this((Long) jsonObject.get("time"), (String) jsonObject.get("channel"), (String) jsonObject.get("source"), (String) jsonObject.get("raw"));
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public String getRawEvent() {
+ return rawEvent;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+ result = prime * result + ((rawEvent == null) ? 0 : rawEvent.hashCode());
+ result = prime * result + ((source == null) ? 0 : source.hashCode());
+ result = prime * result + (int) (time ^ (time >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ WikipediaFeedEvent other = (WikipediaFeedEvent) obj;
+ if (channel == null) {
+ if (other.channel != null)
+ return false;
+ } else if (!channel.equals(other.channel))
+ return false;
+ if (rawEvent == null) {
+ if (other.rawEvent != null)
+ return false;
+ } else if (!rawEvent.equals(other.rawEvent))
+ return false;
+ if (source == null) {
+ if (other.source != null)
+ return false;
+ } else if (!source.equals(other.source))
+ return false;
+ if (time != other.time)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "WikipediaFeedEvent [time=" + time + ", channel=" + channel + ", source=" + source + ", rawEvent=" + rawEvent + "]";
+ }
+
+ public String toJson() {
+ return toJson(this);
+ }
+
+ public static Map<String, Object> toMap(WikipediaFeedEvent event) {
+ Map<String, Object> jsonObject = new HashMap<String, Object>();
+
+ jsonObject.put("time", event.getTime());
+ jsonObject.put("channel", event.getChannel());
+ jsonObject.put("source", event.getSource());
+ jsonObject.put("raw", event.getRawEvent());
+
+ return jsonObject;
+ }
+
+ public static String toJson(WikipediaFeedEvent event) {
+ Map<String, Object> jsonObject = toMap(event);
+
+ try {
+ return jsonMapper.writeValueAsString(jsonObject);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static WikipediaFeedEvent fromJson(String json) {
+ try {
+ return new WikipediaFeedEvent((Map<String, Object>) jsonMapper.readValue(json, Map.class));
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ WikipediaFeed feed = new WikipediaFeed("irc.wikimedia.org", 6667);
+ feed.start();
+
+ feed.listen("#en.wikipedia", new WikipediaFeedListener() {
+ @Override
+ public void onEvent(WikipediaFeedEvent event) {
+ System.out.println(event);
+ }
+ });
+
+ Thread.sleep(20000);
+ feed.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
new file mode 100644
index 0000000..d1612c9
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+
+public class WikipediaSystemFactory implements SystemFactory {
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new SinglePartitionWithoutOffsetsSystemAdmin();
+ }
+
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ String host = config.get("systems." + systemName + ".host");
+ int port = config.getInt("systems." + systemName + ".port");
+ WikipediaFeed feed = new WikipediaFeed(host, port);
+
+ return new WikipediaConsumer(systemName, feed, registry);
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ throw new SamzaException("You can't produce to a Wikipedia feed! How about making some edits to a Wiki, instead?");
+ }
+}