You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:07 UTC
[26/33] samza-hello-samza git commit: Update the code based on latest
samza
Update the code based on latest samza
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/48c858ac
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/48c858ac
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/48c858ac
Branch: refs/heads/master
Commit: 48c858ac0343d6ae6a1f1ce4451d6ddab65be903
Parents: e5943a0
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Thu Oct 26 17:24:16 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Oct 26 17:24:16 2017 -0700
----------------------------------------------------------------------
bin/deploy.sh | 26 +++++++++++
.../cookbook/PageViewAdClickJoiner.java | 6 +--
.../examples/cookbook/PageViewFilterApp.java | 2 +-
.../cookbook/PageViewSessionizerApp.java | 23 +++++----
.../cookbook/TumblingPageViewCounterApp.java | 14 +++---
.../application/WikipediaApplication.java | 49 +++++++++++++++++++-
6 files changed, 100 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
new file mode 100755
index 0000000..51faed1
--- /dev/null
+++ b/bin/deploy.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+
+mvn clean package
+mkdir -p $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-0.13.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
index 4f491f7..f6c3810 100644
--- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
+++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
@@ -107,17 +107,17 @@ public class PageViewAdClickJoiner implements StreamApplication {
MessageStream<PageView> repartitionedPageViews =
pageViews
- .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde))
+ .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde), "pageview")
.map(KV::getValue);
MessageStream<AdClick> repartitionedAdClicks =
adClicks
- .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde))
+ .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde), "adclick")
.map(KV::getValue);
repartitionedPageViews
.join(repartitionedAdClicks, pageViewAdClickJoinFunction,
- stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3))
+ stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3), "join")
.sendTo(joinResults);
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
index 80ce2d1..a2accfd 100644
--- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java
@@ -72,7 +72,7 @@ public class PageViewFilterApp implements StreamApplication {
OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC);
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
.filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
.sendTo(filteredPageViews);
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
index f1000ae..2bcd9f5 100644
--- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
+++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
@@ -27,11 +27,13 @@ import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
import samza.examples.cookbook.data.PageView;
import samza.examples.cookbook.data.UserPageViews;
import java.time.Duration;
+import java.util.function.Function;
/**
* In this example, we group page views by userId into sessions, and compute the number of page views for each user
@@ -74,20 +76,25 @@ public class PageViewSessionizerApp implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
- graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)));
+ Serde<String> stringSerde = new StringSerde();
+ Serde<PageView> pageviewSerde = new JsonSerdeV2<>(PageView.class);
+ KVSerde<String, PageView> pageViewKVSerde = KVSerde.of(stringSerde, pageviewSerde);
+ Serde<UserPageViews> userPageviewSerde = new JsonSerdeV2<>(UserPageViews.class);
+ graph.setDefaultSerde(pageViewKVSerde);
MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC);
OutputStream<KV<String, UserPageViews>> userPageViews =
- graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
+ graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(stringSerde, userPageviewSerde));
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
- .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10)))
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview")
+ .window(Windows.keyedSessionWindow(kv -> kv.value.userId,
+ Duration.ofSeconds(10), stringSerde, pageViewKVSerde), "usersession")
.map(windowPane -> {
- String userId = windowPane.getKey().getKey();
- int views = windowPane.getMessage().size();
- return KV.of(userId, new UserPageViews(userId, views));
- })
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage().size();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(userPageViews);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
index 0809180..acf1411 100644
--- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
+++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
@@ -25,6 +25,7 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -83,14 +84,15 @@ public class TumblingPageViewCounterApp implements StreamApplication {
graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)));
pageViews
- .partitionBy(kv -> kv.value.userId, kv -> kv.value)
+ .partitionBy(kv -> kv.value.userId, kv -> kv.value, "userId")
.window(Windows.keyedTumblingWindow(
- kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1))
+ kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+ new StringSerde(), new IntegerSerde()), "count")
.map(windowPane -> {
- String userId = windowPane.getKey().getKey();
- int views = windowPane.getMessage();
- return KV.of(userId, new UserPageViews(userId, views));
- })
+ String userId = windowPane.getKey().getKey();
+ int views = windowPane.getMessage();
+ return KV.of(userId, new UserPageViews(userId, views));
+ })
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/48c858ac/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 736d934..659e373 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
@@ -38,6 +39,12 @@ import org.slf4j.LoggerFactory;
import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
@@ -105,7 +112,8 @@ public class WikipediaApplication implements StreamApplication {
// Parse, update stats, prepare output, and send
allWikipediaEvents
.map(WikipediaParser::parseEvent)
- .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+ .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new,
+ new WikipediaStatsAggregator(), WikipediaStats.serde()), "Tumbling window of WikipediaStats")
.map(this::formatOutput)
.sendTo(wikipediaStats);
}
@@ -175,7 +183,7 @@ public class WikipediaApplication implements StreamApplication {
/**
* A few statistics about the incoming messages.
*/
- private static class WikipediaStats {
+ public static class WikipediaStats {
// Windowed stats
int edits = 0;
int byteDiff = 0;
@@ -189,6 +197,43 @@ public class WikipediaApplication implements StreamApplication {
public String toString() {
return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts);
}
+
+ static Serde<WikipediaStats> serde() {
+ return new WikipediaStatsSerde();
+ }
+
+ public static class WikipediaStatsSerde implements Serde<WikipediaStats> {
+ @Override
+ public WikipediaStats fromBytes(byte[] bytes) {
+ try {
+ ByteArrayInputStream bias = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bias);
+ WikipediaStats stats = new WikipediaStats();
+ stats.edits = ois.readInt();
+ stats.byteDiff = ois.readInt();
+ stats.titles = (Set<String>) ois.readObject();
+ stats.counts = (Map<String, Integer>) ois.readObject();
+ return stats;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] toBytes(WikipediaStats wikipediaStats) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream dos = new ObjectOutputStream(baos);
+ dos.writeInt(wikipediaStats.edits);
+ dos.writeInt(wikipediaStats.byteDiff);
+ dos.writeObject(wikipediaStats.titles);
+ dos.writeObject(wikipediaStats.counts);
+ return baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
static class WikipediaStatsOutput {