You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/12/09 19:15:09 UTC

[2/3] incubator-samza-hello-samza git commit: SAMZA-495; upgrade to 0.8.0 samza

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
new file mode 100644
index 0000000..07cd8ac
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.Map;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+/**
+ * This task is very simple. All it does is take messages that it receives, and
+ * sends them to a Kafka topic called wikipedia-raw.
+ */
+public class WikipediaFeedStreamTask implements StreamTask {
+  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw");
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());
+    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
new file mode 100644
index 0000000..0505f58
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+
+public class WikipediaParserStreamTask implements StreamTask {
+  @SuppressWarnings("unchecked")
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
+    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
+
+    try {
+      Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
+
+      parsedJsonObject.put("channel", event.getChannel());
+      parsedJsonObject.put("source", event.getSource());
+      parsedJsonObject.put("time", event.getTime());
+
+      collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
+    } catch (Exception e) {
+      System.err.println("Unable to parse line: " + event);
+    }
+  }
+
+  public static Map<String, Object> parse(String line) {
+    Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+    Matcher m = p.matcher(line);
+
+    if (m.find() && m.groupCount() == 6) {
+      String title = m.group(1);
+      String flags = m.group(2);
+      String diffUrl = m.group(3);
+      String user = m.group(4);
+      int byteDiff = Integer.parseInt(m.group(5));
+      String summary = m.group(6);
+
+      Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+      flagMap.put("is-minor", flags.contains("M"));
+      flagMap.put("is-new", flags.contains("N"));
+      flagMap.put("is-unpatrolled", flags.contains("!"));
+      flagMap.put("is-bot-edit", flags.contains("B"));
+      flagMap.put("is-special", title.startsWith("Special:"));
+      flagMap.put("is-talk", title.startsWith("Talk:"));
+
+      Map<String, Object> root = new HashMap<String, Object>();
+
+      root.put("title", title);
+      root.put("user", user);
+      root.put("unparsed-flags", flags);
+      root.put("diff-bytes", byteDiff);
+      root.put("diff-url", diffUrl);
+      root.put("summary", summary);
+      root.put("flags", flagMap);
+
+      return root;
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static void main(String[] args) {
+    String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]]  http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */  Added to note regarding David Shepard's brothers" };
+
+    for (String line : lines) {
+      System.out.println(parse(line));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
new file mode 100644
index 0000000..60fd93d
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.task;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask {
+  private int edits = 0;
+  private int byteDiff = 0;
+  private Set<String> titles = new HashSet<String>();
+  private Map<String, Integer> counts = new HashMap<String, Integer>();
+  private KeyValueStore<String, Integer> store;
+
+  public void init(Config config, TaskContext context) {
+    this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    Map<String, Object> edit = (Map<String, Object>) envelope.getMessage();
+    Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+
+    Integer editsAllTime = store.get("count-edits-all-time");
+    if (editsAllTime == null) editsAllTime = 0;
+    store.put("count-edits-all-time", editsAllTime + 1);
+
+    edits += 1;
+    titles.add((String) edit.get("title"));
+    byteDiff += (Integer) edit.get("diff-bytes");
+
+    for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+      if (Boolean.TRUE.equals(flag.getValue())) {
+        Integer count = counts.get(flag.getKey());
+
+        if (count == null) {
+          count = 0;
+        }
+
+        count += 1;
+        counts.put(flag.getKey(), count);
+      }
+    }
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) {
+    counts.put("edits", edits);
+    counts.put("bytes-added", byteDiff);
+    counts.put("unique-titles", titles.size());
+    counts.put("edits-all-time", store.get("count-edits-all-time"));
+
+    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
+
+    // Reset counts after windowing.
+    edits = 0;
+    byteDiff = 0;
+    titles = new HashSet<String>();
+    counts = new HashMap<String, Integer>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/f9efa43a/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..f0de765
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
+
+  <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+     <param name="DatePattern" value="'.'yyyy-MM-dd" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+    <appender-ref ref="jmx" />
+  </root>
+</log4j:configuration>