You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/07 16:50:15 UTC
[incubator-heron] branch master updated: Add a new streamlet integration test with keyBy, countByKey and … (#3140)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 23f3ff2 Add a new streamlet integration test with keyBy, countByKey and … (#3140)
23f3ff2 is described below
commit 23f3ff246fcf968971d8d83fff656b5031375291
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Mon Jan 7 08:50:09 2019 -0800
Add a new streamlet integration test with keyBy, countByKey and … (#3140)
* Add a new Java streamlet integration test with keyBy, countByKey and ReduceByKey
* Scala version
---
.../StreamletWithKeybyCountAndReduce.java | 139 +++++++++++++++++++++
.../StreamletWithKeybyCountAndReduceResults.json | 1 +
.../src/python/test_runner/resources/test.json | 10 ++
.../ScalaStreamletWithKeybyCountAndReduce.scala | 118 +++++++++++++++++
...alaStreamletWithKeybyCountAndReduceResults.json | 26 ++++
5 files changed, 294 insertions(+)
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java
new file mode 100644
index 0000000..5e0c2b9
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.streamlet_with_keyby_count_and_reduce;
+
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.integration_test.common.AbstractTestTopology;
+import org.apache.heron.integration_test.core.TestTopologyBuilder;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletReducers;
+import org.apache.heron.streamlet.impl.BuilderImpl;
+
+class StreamletWithKeybyCountAndReduce extends AbstractTestTopology {
+ private static final String MONTHS = "january - february - march - april - may - june"
+ + " - july - august - september - october - november - december";
+ private static final Set<String> SPRING_MONTHS =
+ new HashSet<>(Arrays.asList("march", "april", "may"));
+ private static final Set<String> SUMMER_MONTHS =
+ new HashSet<>(Arrays.asList("june", "july", "august"));
+ private static final Set<String> FALL_MONTHS =
+ new HashSet<>(Arrays.asList("september", "october", "november"));
+ private static final Set<String> WINTER_MONTHS =
+ new HashSet<>(Arrays.asList("december", "january", "february"));
+ private static Set<String> incomingMonths = new HashSet<>();
+
+ StreamletWithKeybyCountAndReduce(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ @Override
+ protected TestTopologyBuilder buildTopology(TestTopologyBuilder testTopologyBuilder) {
+ Builder streamletBuilder = Builder.newBuilder();
+
+ Streamlet<String> monthStreamlet = streamletBuilder
+ .newSource(() -> MONTHS)
+ .setName("months-text")
+ .flatMap((String m) -> Arrays.asList(m.split(" - ")))
+ .setName("months")
+ // Make sure each month is emitted only once
+ .filter((month) -> incomingMonths.add(month.toLowerCase()))
+ .setName("unique-months");
+
+ SerializableFunction<String, String> getSeason = month -> {
+ if (SPRING_MONTHS.contains(month)) {
+ return "spring";
+ } else if (SUMMER_MONTHS.contains(month)) {
+ return "summer";
+ } else if (FALL_MONTHS.contains(month)) {
+ return "fall";
+ } else if (WINTER_MONTHS.contains(month)) {
+ return "winter";
+ } else {
+ return "really?";
+ }
+ };
+
+ SerializableFunction<String, Integer> getNumberOfDays = month -> {
+ switch (month) {
+ case "january":
+ return 31;
+ case "february":
+ return 28; // Dont use this code in real projects
+ case "march":
+ return 31;
+ case "april":
+ return 30;
+ case "may":
+ return 31;
+ case "june":
+ return 30;
+ case "july":
+ return 31;
+ case "august":
+ return 31;
+ case "september":
+ return 30;
+ case "october":
+ return 31;
+ case "november":
+ return 30;
+ case "december":
+ return 31;
+ default:
+ return -1; // Shouldn't be here
+ }
+ };
+
+ // Count months per season
+ monthStreamlet
+ .keyBy(getSeason, getNumberOfDays)
+ .setName("key-by-season")
+ .countByKey(x -> x.getKey())
+ .setName("key-by-and-count")
+ .map(x -> String.format("%s: %d months", x.getKey(), x.getValue()))
+ .setName("to-string");
+
+ // Sum days per season
+ monthStreamlet
+ .<String, Integer>reduceByKey(getSeason, getNumberOfDays, StreamletReducers::sum)
+ .setName("sum-by-season")
+ .map(x -> String.format("%s: %d days", x.getKey(), x.getValue()))
+ .setName("to-string-2");
+
+ BuilderImpl streamletBuilderImpl = (BuilderImpl) streamletBuilder;
+ TestTopologyBuilder topology =
+ (TestTopologyBuilder) streamletBuilderImpl.build(testTopologyBuilder);
+
+ return topology;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ StreamletWithKeybyCountAndReduce topology =
+ new StreamletWithKeybyCountAndReduce(args);
+ topology.submit(conf);
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json
new file mode 100644
index 0000000..d7215a6
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json
@@ -0,0 +1 @@
+["fall: 1 months", "fall: 2 months", "fall: 3 months", "fall: 30 days", "fall: 61 days", "fall: 91 days", "spring: 1 months", "spring: 2 months", "spring: 3 months", "spring: 31 days", "spring: 61 days", "spring: 92 days", "summer: 1 months", "summer: 2 months", "summer: 3 months", "summer: 30 days", "summer: 61 days", "summer: 92 days", "winter: 1 months", "winter: 2 months", "winter: 3 months", "winter: 31 days", "winter: 59 days", "winter: 90 days"]
\ No newline at end of file
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index bb15dce..06dcba4 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -27,6 +27,11 @@
"topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFilterAndUnion",
"classPath" : "scala_streamlet_with_map_and_filter_and_union.ScalaStreamletWithMapAndFilterAndUnion",
"expectedResultRelativePath" : "scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json"
+ },
+ {
+ "topologyName": "IntegrationTest_ScalaStreamletWithKeybyCountAndReduce",
+ "classPath": "scala_streamlet_with_keyby_count_and_reduce.ScalaStreamletWithKeybyCountAndReduce",
+ "expectedResultRelativePath": "scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json"
}
],
"javaTopologies": [
@@ -153,6 +158,11 @@
"topologyName": "IntegrationTest_StreamletWithSplitAndWithStream",
"classPath": "streamlet_with_split_and_with_stream.StreamletWithSplitAndWithStream",
"expectedResultRelativePath": "streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json"
+ },
+ {
+ "topologyName": "IntegrationTest_StreamletWithKeybyCountAndReduce",
+ "classPath": "streamlet_with_keyby_count_and_reduce.StreamletWithKeybyCountAndReduce",
+ "expectedResultRelativePath": "streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json"
}
],
"pythonTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala
new file mode 100644
index 0000000..022b189
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_keyby_count_and_reduce
+
+import scala.collection.mutable.Set
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+ AbstractTestTopology,
+ ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.scala.Builder
+
+object ScalaStreamletWithKeybyCountAndReduce {
+ final val months = "january - february - march - april - may - june" +
+ " - july - august - september - october - november - december"
+
+ final val SPRING_MONTHS = List("march", "april", "may")
+ final val SUMMER_MONTHS = List("june", "july", "august")
+ final val FALL_MONTHS = List("september", "october", "november")
+ final val WINTER_MONTHS = List("december", "january", "february")
+
+ val incomingMonths = Set[String]()
+
+ def main(args: Array[String]): Unit = {
+ val conf = new Config
+ val topology = new ScalaStreamletWithKeybyCountAndReduce(args)
+ topology.submit(conf)
+ }
+
+ def getSeason(month: String): String = {
+ month match {
+ case x if SPRING_MONTHS.contains(x) => "spring"
+ case x if SUMMER_MONTHS.contains(x) => "summer"
+ case x if FALL_MONTHS.contains(x) => "fall"
+ case x if WINTER_MONTHS.contains(x) => "winter"
+ case _ => "really?"
+ }
+ }
+
+ def getNumberOfDays(month: String): Int = {
+ month match {
+ case "january" => 31
+ case "february" => 28 // Dont use this code in real projects
+ case "march" => 31
+ case "april" => 30
+ case "may" => 31
+ case "june" => 30
+ case "july" => 31
+ case "august" => 31
+ case "september" => 30
+ case "october" => 31
+ case "november" => 30
+ case "december" => 31
+ case _ => -1 // Shouldn't be here
+ }
+ }
+}
+
+/**
+ * Scala Streamlet Integration Test by covering keyBy, CountByKey and ReduceByKey operations.
+ */
+class ScalaStreamletWithKeybyCountAndReduce(args: Array[String])
+ extends AbstractTestTopology(args)
+ with ScalaIntegrationTestBase {
+
+ import ScalaStreamletWithKeybyCountAndReduce._
+
+ override protected def buildTopology(
+ testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+
+ val streamletBuilder = Builder.newBuilder
+
+ val monthStreamlet = streamletBuilder
+ .newSource(() => months)
+ .setName("months-text")
+ .flatMap[String]((m: String) => m.split(" - "))
+ .setName("months")
+ .filter((month: String) => incomingMonths.add(month.toLowerCase))
+ .setName("unique-months")
+
+ // Count months per season
+ monthStreamlet
+ .keyBy(getSeason(_), getNumberOfDays(_))
+ .setName("key-by-season")
+ .countByKey(_.getKey)
+ .setName("key-by-and-count")
+ .map(x => s"${x.getKey}: ${x.getValue} months")
+ .setName("to-string")
+
+ // Sum days per season
+ monthStreamlet
+ .reduceByKey[String, Int](
+ getSeason(_), getNumberOfDays(_), (x: Int, y: Int) => x + y)
+ .setName("sum-by-season")
+ .map(x => s"${x.getKey}: ${x.getValue} days")
+ .setName("to-string-2")
+
+ build(testTopologyBuilder, streamletBuilder)
+ }
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json
new file mode 100644
index 0000000..91aeec5
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json
@@ -0,0 +1,26 @@
+[
+ "fall: 1 months",
+ "fall: 2 months",
+ "fall: 3 months",
+ "fall: 30 days",
+ "fall: 61 days",
+ "fall: 91 days",
+ "spring: 1 months",
+ "spring: 2 months",
+ "spring: 3 months",
+ "spring: 31 days",
+ "spring: 61 days",
+ "spring: 92 days",
+ "summer: 1 months",
+ "summer: 2 months",
+ "summer: 3 months",
+ "summer: 30 days",
+ "summer: 61 days",
+ "summer: 92 days",
+ "winter: 1 months",
+ "winter: 2 months",
+ "winter: 3 months",
+ "winter: 31 days",
+ "winter: 59 days",
+ "winter: 90 days"
+]
\ No newline at end of file