You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:07 UTC

[26/33] samza-hello-samza git commit: Update the code based on latest samza

Update the code based on latest samza


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/48c858ac
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/48c858ac
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/48c858ac

Branch: refs/heads/master
Commit: 48c858ac0343d6ae6a1f1ce4451d6ddab65be903
Parents: e5943a0
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Thu Oct 26 17:24:16 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Oct 26 17:24:16 2017 -0700

----------------------------------------------------------------------
 bin/deploy.sh                                   | 26 +++++++++++
 .../cookbook/PageViewAdClickJoiner.java         |  6 +--
 .../examples/cookbook/PageViewFilterApp.java    |  2 +-
 .../cookbook/PageViewSessionizerApp.java        | 23 +++++----
 .../cookbook/TumblingPageViewCounterApp.java    | 14 +++---
 .../application/WikipediaApplication.java       | 49 +++++++++++++++++++-
 6 files changed, 100 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
new file mode 100755
index 0000000..51faed1
--- /dev/null
+++ b/bin/deploy.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+
+mvn clean package
+mkdir -p $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-0.13.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 4f491f7..f6c3810 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -107,17 +107,17 @@ public class PageViewAdClickJoiner implements StreamApplication {
 
     MessageStream<PageView> repartitionedPageViews =
         pageViews
-            .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+            .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde), "pageview")
             .map(KV::getValue);
 
     MessageStream<AdClick> repartitionedAdClicks =
         adClicks
-            .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+            .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde), "adclick")
             .map(KV::getValue);
 
     repartitionedPageViews
         .join(repartitionedAdClicks, pageViewAdClickJoinFunction,
-            stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+            stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3), "join")
         .sendTo(joinResults);
   }
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index 80ce2d1..a2accfd 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -72,7 +72,7 @@ public class PageViewFilterApp implements StreamApplication {
     OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
 
     pageViews
-        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
         .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
         .sendTo(filteredPageViews);
   }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index f1000ae..2bcd9f5 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -27,11 +27,13 @@ import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import samza.examples.cookbook.data.PageView;
 import samza.examples.cookbook.data.UserPageViews;
 
 import java.time.Duration;
+import java.util.function.Function;
 
 /**
  * In this example, we group page views by userId into sessions, and compute the number of page views for each user
@@ -74,20 +76,25 @@ public class PageViewSessionizerApp implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
+    Serde<String> stringSerde = new StringSerde();
+    Serde<PageView> pageviewSerde = new JsonSerdeV2<>(PageView.class);
+    KVSerde<String, PageView> pageViewKVSerde = KVSerde.of(stringSerde, pageviewSerde);
+    Serde<UserPageViews> userPageviewSerde = new JsonSerdeV2<>(UserPageViews.class);
+    graph.setDefaultSerde(pageViewKVSerde);
 
     MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
     OutputStream<KV<String, UserPageViews>> userPageViews =
-        graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
+        graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(stringSerde, userPageviewSerde));
 
     pageViews
-        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
-        .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
+        .window(Windows.keyedSessionWindow(kv -> kv.value.userId,
+            Duration.ofSeconds(10), stringSerde, pageViewKVSerde), "usersession")
         .map(windowPane -> {
-            String userId = windowPane.getKey().getKey();
-            int views = windowPane.getMessage().size();
-            return KV.of(userId, new UserPageViews(userId, views));
-          })
+          String userId = windowPane.getKey().getKey();
+          int views = windowPane.getMessage().size();
+          return KV.of(userId, new UserPageViews(userId, views));
+        })
         .sendTo(userPageViews);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 0809180..acf1411 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -25,6 +25,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -83,14 +84,15 @@ public class TumblingPageViewCounterApp implements StreamApplication {
         graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
 
     pageViews
-        .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+        .partitionBy(kv -> kv.value.userId, kv -> kv.value, "userId")
         .window(Windows.keyedTumblingWindow(
-            kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+            kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+            new StringSerde(), new IntegerSerde()), "count")
         .map(windowPane -> {
-            String userId = windowPane.getKey().getKey();
-            int views = windowPane.getMessage();
-            return KV.of(userId, new UserPageViews(userId, views));
-          })
+          String userId = windowPane.getKey().getKey();
+          int views = windowPane.getMessage();
+          return KV.of(userId, new UserPageViews(userId, views));
+        })
         .sendTo(outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 736d934..659e373 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
@@ -38,6 +39,12 @@ import org.slf4j.LoggerFactory;
 import samza.examples.wikipedia.model.WikipediaParser;
 import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -105,7 +112,8 @@ public class WikipediaApplication implements StreamApplication {
     // Parse, update stats, prepare output, and send
     allWikipediaEvents
         .map(WikipediaParser::parseEvent)
-        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new,
+                new WikipediaStatsAggregator(), WikipediaStats.serde()), "Tumbling window of WikipediaStats")
         .map(this::formatOutput)
         .sendTo(wikipediaStats);
   }
@@ -175,7 +183,7 @@ public class WikipediaApplication implements StreamApplication {
   /**
    * A few statistics about the incoming messages.
    */
-  private static class WikipediaStats {
+  public static class WikipediaStats {
     // Windowed stats
     int edits = 0;
     int byteDiff = 0;
@@ -189,6 +197,43 @@ public class WikipediaApplication implements StreamApplication {
     public String toString() {
       return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
     }
+
+    static Serde<WikipediaStats> serde() {
+      return new WikipediaStatsSerde();
+    }
+
+    public static class WikipediaStatsSerde implements Serde<WikipediaStats> {
+      @Override
+      public WikipediaStats fromBytes(byte[] bytes) {
+        try {
+          ByteArrayInputStream bias = new ByteArrayInputStream(bytes);
+          ObjectInputStream ois = new ObjectInputStream(bias);
+          WikipediaStats stats = new WikipediaStats();
+          stats.edits = ois.readInt();
+          stats.byteDiff = ois.readInt();
+          stats.titles = (Set<String>) ois.readObject();
+          stats.counts = (Map<String, Integer>) ois.readObject();
+          return stats;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] toBytes(WikipediaStats wikipediaStats) {
+        try {
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          ObjectOutputStream dos = new ObjectOutputStream(baos);
+          dos.writeInt(wikipediaStats.edits);
+          dos.writeInt(wikipediaStats.byteDiff);
+          dos.writeObject(wikipediaStats.titles);
+          dos.writeObject(wikipediaStats.counts);
+          return baos.toByteArray();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
   }
 
   static class WikipediaStatsOutput {