You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/05 23:55:15 UTC
samza-hello-samza git commit: SAMZA-1236: Initial draft of the fluent
API example for tutorials
Repository: samza-hello-samza
Updated Branches:
refs/heads/latest c87ed565f -> 3d0e919e6
SAMZA-1236: Initial draft of the fluent API example for tutorials
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #11 from jmakes/samza-1236
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/3d0e919e
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/3d0e919e
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/3d0e919e
Branch: refs/heads/latest
Commit: 3d0e919e6cf96ffab7c2028e5ebef2bd99624346
Parents: c87ed56
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri May 5 16:48:21 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri May 5 16:48:21 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
src/main/assembly/src.xml | 5 +
.../config/wikipedia-application.properties | 71 +++++++
src/main/config/wikipedia-parser.properties | 6 -
src/main/config/wikipedia-stats.properties | 6 +
.../application/WikipediaApplication.java | 188 +++++++++++++++++++
.../wikipedia/model/WikipediaParser.java | 80 ++++++++
.../task/WikipediaParserStreamTask.java | 57 +-----
.../task/WikipediaStatsStreamTask.java | 24 ++-
src/main/resources/log4j.xml | 9 +-
10 files changed, 376 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 40505ce..ec451d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -71,6 +71,7 @@ task distTar(dependsOn: build, type: Tar) {
include "wikipedia-feed.properties"
include "wikipedia-parser.properties"
include "wikipedia-stats.properties"
+ include "wikipedia-application.properties"
// expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle)
filter { String line ->
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index e280a9a..ca90ebf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -52,6 +52,11 @@
<filtered>true</filtered>
</file>
<file>
+ <source>${basedir}/src/main/config/wikipedia-application.properties</source>
+ <outputDirectory>config</outputDirectory>
+ <filtered>true</filtered>
+ </file>
+ <file>
<source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-application.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties
new file mode 100644
index 0000000..59a124f
--- /dev/null
+++ b/src/main/config/wikipedia-application.properties
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=wikipedia-application
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task/Application
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+app.class=samza.examples.wikipedia.application.WikipediaApplication
+task.window.ms=10000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+
+# Streams which are not on default system or have special characters in the physical name.
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+# Key-value storage
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+
+# Defaults
+job.default.system=kafka
+
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-parser.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties
index 6d1e3df..e8f3fa0 100644
--- a/src/main/config/wikipedia-parser.properties
+++ b/src/main/config/wikipedia-parser.properties
@@ -26,12 +26,6 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
task.inputs=kafka.wikipedia-raw
-# Metrics
-metrics.reporters=snapshot,jmx
-metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
-metrics.reporter.snapshot.stream=kafka.metrics
-metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
-
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-stats.properties
----------------------------------------------------------------------
diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties
index f6b85bf..0a1cf31 100644
--- a/src/main/config/wikipedia-stats.properties
+++ b/src/main/config/wikipedia-stats.properties
@@ -27,6 +27,12 @@ task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask
task.inputs=kafka.wikipedia-edits
task.window.ms=10000
+# Metrics
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
new file mode 100644
index 0000000..b0779db
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import samza.examples.wikipedia.model.WikipediaParser;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+
+/**
+ * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
+ * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask},
+ * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and
+ * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression.
+ *
+ * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits"
+ * streams to connect the operators, as they are not needed with the fluent API.
+ *
+ * The application processes Wikipedia events in the following steps:
+ * <ul>
+ * <li>Merge wikipedia, wiktionary, and wikinews events into one stream</li>
+ * <li>Parse each event to a more structured format</li>
+ * <li>Aggregate some stats over a 10s window</li>
+ * <li>Format each window output for public consumption</li>
+ * <li>Send the window output to Kafka</li>
+ * </ul>
+ *
+ * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which
+ * is invoked by the framework to load the application.
+ */
+public class WikipediaApplication implements StreamApplication {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class);
+
+ private static final String STATS_STORE_NAME = "wikipedia-stats";
+ private static final String EDIT_COUNT_KEY = "count-edits-all-time";
+
+ private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia";
+ private static final String WIKTIONARY_STREAM_ID = "en-wiktionary";
+ private static final String WIKINEWS_STREAM_ID = "en-wikinews";
+
+ private static final String STATS_STREAM_ID = "wikipedia-stats";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ // Inputs
+ // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
+ // They are un-keyed, so the 'k' parameter to the msgBuilder is not used
+ MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+ MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v);
+
+ // Output (also un-keyed, so no keyExtractor)
+ OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m);
+
+ // Merge inputs
+ MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents));
+
+ // Parse, update stats, prepare output, and send
+ allWikipediaEvents.map(WikipediaParser::parseEvent)
+ .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+ .map(this::formatOutput)
+ .sendTo(wikipediaStats);
+ }
+
+ /**
+ * A few statistics about the incoming messages.
+ */
+ private static class WikipediaStats {
+ // Windowed stats
+ int edits = 0;
+ int byteDiff = 0;
+ Set<String> titles = new HashSet<String>();
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ // Total stats
+ int totalEdits = 0;
+
+ @Override
+ public String toString() {
+ return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
+ }
+ }
+
+ /**
+ * Updates the windowed and total stats based on each "edit" event.
+ *
+ * Uses a KeyValueStore to persist a total edit count across restarts.
+ */
+ private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> {
+
+ private KeyValueStore<String, Integer> store;
+
+ // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+ private Counter repeatEdits;
+
+ /**
+ * {@inheritDoc}
+ * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+ * get a KeyValueStore for persistence and the MetricsRegistry for metrics.
+ */
+ @Override
+ public void init(Config config, TaskContext context) {
+ store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME);
+ repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+ }
+
+ @Override
+ public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) {
+
+ // Update persisted total
+ Integer editsAllTime = store.get(EDIT_COUNT_KEY);
+ if (editsAllTime == null) editsAllTime = 0;
+ editsAllTime++;
+ store.put(EDIT_COUNT_KEY, editsAllTime);
+
+ // Update window stats
+ stats.edits++;
+ stats.totalEdits = editsAllTime;
+ stats.byteDiff += (Integer) edit.get("diff-bytes");
+ boolean newTitle = stats.titles.add((String) edit.get("title"));
+
+ Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+ for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+ if (Boolean.TRUE.equals(flag.getValue())) {
+ stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
+ }
+ }
+
+ if (!newTitle) {
+ repeatEdits.inc();
+ log.info("Frequent edits for title: {}", edit.get("title"));
+ }
+ return stats;
+ }
+ }
+
+ /**
+ * Format the stats for output to Kafka.
+ */
+ private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+
+ WikipediaStats stats = statsWindowPane.getMessage();
+
+ Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
+ counts.put("edits", stats.edits);
+ counts.put("edits-all-time", stats.totalEdits);
+ counts.put("bytes-added", stats.byteDiff);
+ counts.put("unique-titles", stats.titles.size());
+
+ return counts;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
new file mode 100644
index 0000000..9347962
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import samza.examples.wikipedia.system.WikipediaFeed;
+
+
+public class WikipediaParser {
+ public static Map<String, Object> parseEvent(WikipediaFeed.WikipediaFeedEvent wikipediaFeedEvent) {
+ Map<String, Object> parsedJsonObject = null;
+ try {
+ parsedJsonObject = WikipediaParser.parseLine(wikipediaFeedEvent.getRawEvent());
+
+ parsedJsonObject.put("channel", wikipediaFeedEvent.getChannel());
+ parsedJsonObject.put("source", wikipediaFeedEvent.getSource());
+ parsedJsonObject.put("time", wikipediaFeedEvent.getTime());
+ } catch (Exception e) {
+ System.err.println("Unable to parse line: " + wikipediaFeedEvent);
+ }
+ return parsedJsonObject;
+ }
+
+ public static Map<String, Object> parseLine(String line) {
+ Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+ Matcher m = p.matcher(line);
+
+ if (m.find() && m.groupCount() == 6) {
+ String title = m.group(1);
+ String flags = m.group(2);
+ String diffUrl = m.group(3);
+ String user = m.group(4);
+ int byteDiff = Integer.parseInt(m.group(5));
+ String summary = m.group(6);
+
+ Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+ flagMap.put("is-minor", flags.contains("M"));
+ flagMap.put("is-new", flags.contains("N"));
+ flagMap.put("is-unpatrolled", flags.contains("!"));
+ flagMap.put("is-bot-edit", flags.contains("B"));
+ flagMap.put("is-special", title.startsWith("Special:"));
+ flagMap.put("is-talk", title.startsWith("Talk:"));
+
+ Map<String, Object> root = new HashMap<String, Object>();
+
+ root.put("title", title);
+ root.put("user", user);
+ root.put("unparsed-flags", flags);
+ root.put("diff-bytes", byteDiff);
+ root.put("diff-url", diffUrl);
+ root.put("summary", summary);
+ root.put("flags", flagMap);
+
+ return root;
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
index 0505f58..aee8939 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -19,72 +19,29 @@
package samza.examples.wikipedia.task;
-import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
public class WikipediaParserStreamTask implements StreamTask {
+ private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-edits");
+
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
- try {
- Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
-
- parsedJsonObject.put("channel", event.getChannel());
- parsedJsonObject.put("source", event.getSource());
- parsedJsonObject.put("time", event.getTime());
-
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
- } catch (Exception e) {
- System.err.println("Unable to parse line: " + event);
- }
- }
-
- public static Map<String, Object> parse(String line) {
- Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
- Matcher m = p.matcher(line);
-
- if (m.find() && m.groupCount() == 6) {
- String title = m.group(1);
- String flags = m.group(2);
- String diffUrl = m.group(3);
- String user = m.group(4);
- int byteDiff = Integer.parseInt(m.group(5));
- String summary = m.group(6);
-
- Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
-
- flagMap.put("is-minor", flags.contains("M"));
- flagMap.put("is-new", flags.contains("N"));
- flagMap.put("is-unpatrolled", flags.contains("!"));
- flagMap.put("is-bot-edit", flags.contains("B"));
- flagMap.put("is-special", title.startsWith("Special:"));
- flagMap.put("is-talk", title.startsWith("Talk:"));
-
- Map<String, Object> root = new HashMap<String, Object>();
-
- root.put("title", title);
- root.put("user", user);
- root.put("unparsed-flags", flags);
- root.put("diff-bytes", byteDiff);
- root.put("diff-url", diffUrl);
- root.put("summary", summary);
- root.put("flags", flagMap);
+ Map<String, Object> parsedJsonObject = WikipediaParser.parseEvent(event);
- return root;
- } else {
- throw new IllegalArgumentException();
+ if (parsedJsonObject != null) {
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, parsedJsonObject));
}
}
@@ -92,7 +49,7 @@ public class WikipediaParserStreamTask implements StreamTask {
String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" };
for (String line : lines) {
- System.out.println(parse(line));
+ System.out.println(WikipediaParser.parseLine(line));
}
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
index 60fd93d..abe760a 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -36,14 +37,20 @@ import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
+ private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-stats");
+
private int edits = 0;
private int byteDiff = 0;
private Set<String> titles = new HashSet<String>();
private Map<String, Integer> counts = new HashMap<String, Integer>();
private KeyValueStore<String, Integer> store;
+ // Example metric. Running counter of the number of repeat edits of the same title within a single window.
+ private Counter repeatEdits;
+
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+ this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
}
@SuppressWarnings("unchecked")
@@ -57,21 +64,18 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
store.put("count-edits-all-time", editsAllTime + 1);
edits += 1;
- titles.add((String) edit.get("title"));
byteDiff += (Integer) edit.get("diff-bytes");
+ boolean newTitle = titles.add((String) edit.get("title"));
for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
if (Boolean.TRUE.equals(flag.getValue())) {
- Integer count = counts.get(flag.getKey());
-
- if (count == null) {
- count = 0;
- }
-
- count += 1;
- counts.put(flag.getKey(), count);
+ counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
}
}
+
+ if (!newTitle) {
+ repeatEdits.inc();
+ }
}
@Override
@@ -81,7 +85,7 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
counts.put("unique-titles", titles.size());
counts.put("edits-all-time", store.get("count-edits-all-time"));
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, counts));
// Reset counts after windowing.
edits = 0;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index 086d6b8..805d5ca 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -40,14 +40,13 @@
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
</layout>
</appender>
+ <logger name="STARTUP_LOGGER" additivity="false">
+ <level value="info" />
+ <appender-ref ref="StartupAppender"/>
+ </logger>
<root>
<priority value="info" />
<appender-ref ref="RollingAppender"/>
<appender-ref ref="jmx" />
</root>
- <logger name="STARTUP_LOGGER" additivity="false">
- <level value="info" />
- <appender-ref ref="StartupAppender"/>
- </logger>
-
</log4j:configuration>