You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/07 16:50:15 UTC

[incubator-heron] branch master updated: Add a new streamlet integration test with keyBy, countByKey and … (#3140)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 23f3ff2  Add a new streamlet integration test with keyBy, countByKey and … (#3140)
23f3ff2 is described below

commit 23f3ff246fcf968971d8d83fff656b5031375291
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Mon Jan 7 08:50:09 2019 -0800

    Add a new streamlet integration test with keyBy, countByKey and … (#3140)
    
    * Add a new Java streamlet integration test with keyBy, countByKey and ReduceByKey
    
    * Scala version
---
 .../StreamletWithKeybyCountAndReduce.java          | 139 +++++++++++++++++++++
 .../StreamletWithKeybyCountAndReduceResults.json   |   1 +
 .../src/python/test_runner/resources/test.json     |  10 ++
 .../ScalaStreamletWithKeybyCountAndReduce.scala    | 118 +++++++++++++++++
 ...alaStreamletWithKeybyCountAndReduceResults.json |  26 ++++
 5 files changed, 294 insertions(+)

diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java
new file mode 100644
index 0000000..5e0c2b9
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduce.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.streamlet_with_keyby_count_and_reduce;
+
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.integration_test.common.AbstractTestTopology;
+import org.apache.heron.integration_test.core.TestTopologyBuilder;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletReducers;
+import org.apache.heron.streamlet.impl.BuilderImpl;
+
+class StreamletWithKeybyCountAndReduce extends AbstractTestTopology {
+  private static final String MONTHS = "january - february - march - april - may - june"
+      + " - july - august - september - october - november - december";
+  private static final Set<String> SPRING_MONTHS =
+      new HashSet<>(Arrays.asList("march", "april", "may"));
+  private static final Set<String> SUMMER_MONTHS =
+      new HashSet<>(Arrays.asList("june", "july", "august"));
+  private static final Set<String> FALL_MONTHS =
+      new HashSet<>(Arrays.asList("september", "october", "november"));
+  private static final Set<String> WINTER_MONTHS =
+      new HashSet<>(Arrays.asList("december", "january", "february"));
+  private static Set<String> incomingMonths = new HashSet<>();
+
+  StreamletWithKeybyCountAndReduce(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TestTopologyBuilder buildTopology(TestTopologyBuilder testTopologyBuilder) {
+    Builder streamletBuilder = Builder.newBuilder();
+
+    Streamlet<String> monthStreamlet = streamletBuilder
+        .newSource(() -> MONTHS)
+        .setName("months-text")
+        .flatMap((String m) -> Arrays.asList(m.split(" - ")))
+        .setName("months")
+        // Make sure each month is emitted only once
+        .filter((month) -> incomingMonths.add(month.toLowerCase()))
+        .setName("unique-months");
+
+    SerializableFunction<String, String> getSeason = month -> {
+      if (SPRING_MONTHS.contains(month)) {
+        return "spring";
+      } else if (SUMMER_MONTHS.contains(month)) {
+        return "summer";
+      } else if (FALL_MONTHS.contains(month)) {
+        return "fall";
+      } else if (WINTER_MONTHS.contains(month)) {
+        return "winter";
+      } else {
+        return "really?";
+      }
+    };
+
+    SerializableFunction<String, Integer> getNumberOfDays = month -> {
+      switch (month) {
+        case "january":
+          return 31;
+        case "february":
+          return 28;   // Dont use this code in real projects
+        case "march":
+          return 31;
+        case "april":
+          return 30;
+        case "may":
+          return 31;
+        case "june":
+          return 30;
+        case "july":
+          return 31;
+        case "august":
+          return 31;
+        case "september":
+          return 30;
+        case "october":
+          return 31;
+        case "november":
+          return 30;
+        case "december":
+          return 31;
+        default:
+          return -1;  // Shouldn't be here
+      }
+    };
+
+    // Count months per season
+    monthStreamlet
+        .keyBy(getSeason, getNumberOfDays)
+        .setName("key-by-season")
+        .countByKey(x -> x.getKey())
+        .setName("key-by-and-count")
+        .map(x -> String.format("%s: %d months", x.getKey(), x.getValue()))
+        .setName("to-string");
+
+    // Sum days per season
+    monthStreamlet
+        .<String, Integer>reduceByKey(getSeason, getNumberOfDays, StreamletReducers::sum)
+        .setName("sum-by-season")
+        .map(x -> String.format("%s: %d days", x.getKey(), x.getValue()))
+        .setName("to-string-2");
+
+    BuilderImpl streamletBuilderImpl = (BuilderImpl) streamletBuilder;
+    TestTopologyBuilder topology =
+        (TestTopologyBuilder) streamletBuilderImpl.build(testTopologyBuilder);
+
+    return topology;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Config conf = new Config();
+    StreamletWithKeybyCountAndReduce topology =
+        new StreamletWithKeybyCountAndReduce(args);
+    topology.submit(conf);
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json
new file mode 100644
index 0000000..d7215a6
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json
@@ -0,0 +1 @@
+["fall: 1 months", "fall: 2 months", "fall: 3 months", "fall: 30 days", "fall: 61 days", "fall: 91 days", "spring: 1 months", "spring: 2 months", "spring: 3 months", "spring: 31 days", "spring: 61 days", "spring: 92 days", "summer: 1 months", "summer: 2 months", "summer: 3 months", "summer: 30 days", "summer: 61 days", "summer: 92 days", "winter: 1 months", "winter: 2 months", "winter: 3 months", "winter: 31 days", "winter: 59 days", "winter: 90 days"]
\ No newline at end of file
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index bb15dce..06dcba4 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -27,6 +27,11 @@
       "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFilterAndUnion",
       "classPath"    : "scala_streamlet_with_map_and_filter_and_union.ScalaStreamletWithMapAndFilterAndUnion",
       "expectedResultRelativePath" : "scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json"
+    },
+    {
+      "topologyName": "IntegrationTest_ScalaStreamletWithKeybyCountAndReduce",
+      "classPath": "scala_streamlet_with_keyby_count_and_reduce.ScalaStreamletWithKeybyCountAndReduce",
+      "expectedResultRelativePath": "scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json"
     }
   ],
   "javaTopologies": [
@@ -153,6 +158,11 @@
       "topologyName": "IntegrationTest_StreamletWithSplitAndWithStream",
       "classPath": "streamlet_with_split_and_with_stream.StreamletWithSplitAndWithStream",
       "expectedResultRelativePath": "streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json"
+    },
+    {
+      "topologyName": "IntegrationTest_StreamletWithKeybyCountAndReduce",
+      "classPath": "streamlet_with_keyby_count_and_reduce.StreamletWithKeybyCountAndReduce",
+      "expectedResultRelativePath": "streamlet_with_keyby_count_and_reduce/StreamletWithKeybyCountAndReduceResults.json"
     }
   ],
   "pythonTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala
new file mode 100644
index 0000000..022b189
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduce.scala
@@ -0,0 +1,118 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_keyby_count_and_reduce
+
+import scala.collection.mutable.Set
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+  AbstractTestTopology,
+  ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.scala.Builder
+
+object ScalaStreamletWithKeybyCountAndReduce {
+  final val months = "january - february - march - april - may - june" +
+    " - july - august - september - october - november - december"
+
+  final val SPRING_MONTHS = List("march", "april", "may")
+  final val SUMMER_MONTHS = List("june", "july", "august")
+  final val FALL_MONTHS = List("september", "october", "november")
+  final val WINTER_MONTHS = List("december", "january", "february")
+
+  val incomingMonths = Set[String]()
+
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithKeybyCountAndReduce(args)
+    topology.submit(conf)
+  }
+
+  def getSeason(month: String): String = {
+    month match {
+      case x if SPRING_MONTHS.contains(x) => "spring"
+      case x if SUMMER_MONTHS.contains(x) => "summer"
+      case x if FALL_MONTHS.contains(x) => "fall"
+      case x if WINTER_MONTHS.contains(x) => "winter"
+      case _ => "really?"
+    }
+  }
+
+  def getNumberOfDays(month: String): Int = {
+    month match {
+      case "january" => 31
+        case "february" => 28       // Dont use this code in real projects
+        case "march" => 31
+        case "april" => 30
+        case "may" => 31
+        case "june"  => 30
+        case "july" => 31
+        case "august" => 31
+        case "september" => 30
+        case "october" => 31
+        case "november" => 30
+        case "december" => 31
+        case _ => -1                // Shouldn't be here
+    }
+  }
+}
+
+/**
+  * Scala Streamlet Integration Test by covering keyBy, CountByKey and ReduceByKey operations.
+  */
+class ScalaStreamletWithKeybyCountAndReduce(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  import ScalaStreamletWithKeybyCountAndReduce._
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+
+    val streamletBuilder = Builder.newBuilder
+
+    val monthStreamlet = streamletBuilder
+      .newSource(() => months)
+      .setName("months-text")
+      .flatMap[String]((m: String) => m.split(" - "))
+      .setName("months")
+      .filter((month: String) => incomingMonths.add(month.toLowerCase))
+      .setName("unique-months")
+
+    // Count months per season
+    monthStreamlet
+        .keyBy(getSeason(_), getNumberOfDays(_))
+        .setName("key-by-season")
+        .countByKey(_.getKey)
+        .setName("key-by-and-count")
+        .map(x => s"${x.getKey}: ${x.getValue} months")
+        .setName("to-string")
+
+    // Sum days per season
+    monthStreamlet
+        .reduceByKey[String, Int](
+            getSeason(_), getNumberOfDays(_), (x: Int, y: Int) => x + y)
+        .setName("sum-by-season")
+        .map(x => s"${x.getKey}: ${x.getValue} days")
+        .setName("to-string-2")
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json
new file mode 100644
index 0000000..91aeec5
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_keyby_count_and_reduce/ScalaStreamletWithKeybyCountAndReduceResults.json
@@ -0,0 +1,26 @@
+[
+  "fall: 1 months",
+  "fall: 2 months",
+  "fall: 3 months",
+  "fall: 30 days",
+  "fall: 61 days",
+  "fall: 91 days",
+  "spring: 1 months",
+  "spring: 2 months",
+  "spring: 3 months",
+  "spring: 31 days",
+  "spring: 61 days",
+  "spring: 92 days",
+  "summer: 1 months",
+  "summer: 2 months",
+  "summer: 3 months",
+  "summer: 30 days",
+  "summer: 61 days",
+  "summer: 92 days",
+  "winter: 1 months",
+  "winter: 2 months",
+  "winter: 3 months",
+  "winter: 31 days",
+  "winter: 59 days",
+  "winter: 90 days"
+]
\ No newline at end of file