You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/04/13 23:37:31 UTC
[incubator-heron] branch master updated: [Streamlet Scala API] Add
Scala Streamlet Integration Tests Part II (#2861)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new f444372 [Streamlet Scala API] Add Scala Streamlet Integration Tests Part II (#2861)
f444372 is described below
commit f444372de2749c8a6953ff98a5178550ec026cd8
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Sat Apr 14 00:37:28 2018 +0100
[Streamlet Scala API] Add Scala Streamlet Integration Tests Part II (#2861)
* Scala Streamlet Integration Tests Part II
* Merge conflicts have been fixed
---
.../src/python/test_runner/resources/test.json | 5 ++
.../common/ScalaIntegrationTestBase.scala | 2 +-
.../ScalaStreamletWithFilterAndTransform.scala | 1 -
...reamletWithMapAndFlatMapAndFilterAndClone.scala | 74 ++++++++++++++++++++++
...tWithMapAndFlatMapAndFilterAndCloneResults.json | 1 +
5 files changed, 81 insertions(+), 2 deletions(-)
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 7d7cf2b..1174154 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -12,6 +12,11 @@
"topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform",
"classPath" : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform",
"expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
+ "classPath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone.ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
+ "expectedResultRelativePath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
}
],
"javaTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
index 0b36d6a..915233f 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.heron.streamlet.scala.impl.BuilderImpl
/**
* Scala Integration Test Base
*/
-trait ScalaIntegrationTestBase extends Serializable {
+trait ScalaIntegrationTestBase {
protected def build(testTopologyBuilder: TestTopologyBuilder,
streamletBuilder: Builder): TestTopologyBuilder = {
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
index 023f57e..7c1d7e8 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -38,7 +38,6 @@ object ScalaStreamletWithFilterAndTransform {
/**
* Scala Streamlet Integration Test
*/
-@SerialVersionUID(-7280407024398984674L)
class ScalaStreamletWithFilterAndTransform(args: Array[String])
extends AbstractTestTopology(args)
with ScalaIntegrationTestBase {
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
new file mode 100644
index 0000000..2742b99
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
@@ -0,0 +1,74 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone
+
+import scala.collection.mutable.Set
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+ AbstractTestTopology,
+ ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.scala.Builder
+
+object ScalaStreamletWithMapAndFlatMapAndFilterAndClone {
+ val months = "january - february - march - april - may - june" +
+ " - july - august - september - october - november - december"
+
+ val summerMonths =
+ List("june", "july", "august")
+
+ val incomingMonths = Set[String]()
+
+ def main(args: Array[String]): Unit = {
+ val conf = new Config
+ val topology = new ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args)
+ topology.submit(conf)
+ }
+}
+
+class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
+ extends AbstractTestTopology(args)
+ with ScalaIntegrationTestBase {
+
+ import ScalaStreamletWithMapAndFlatMapAndFilterAndClone._
+
+ override protected def buildTopology(
+ testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+ val streamletBuilder = Builder.newBuilder
+
+ val clonedStreamlet = streamletBuilder
+ .newSource(() => months)
+ .setName("months-text")
+ .flatMap[String]((m: String) => m.split(" - "))
+ .setName("months")
+ .filter((month: String) =>
+ (summerMonths.contains(month.toLowerCase)
+ && incomingMonths.add(month.toLowerCase)))
+ .setName("summer-months")
+ .map[String]((word: String) => word.substring(0, 3))
+ .setName("summer-months-with-short-name")
+ .clone(numClones = 2)
+
+ //Returns Summer Months with Uppercase
+ clonedStreamlet(0).map[String](month => month + "_2018")
+
+ //Returns Summer Months with Uppercase
+ clonedStreamlet(1).map[String](_.toUpperCase)
+
+ build(testTopologyBuilder, streamletBuilder)
+ }
+
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json
new file mode 100644
index 0000000..6ececf2
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json
@@ -0,0 +1 @@
+["AUG", "JUL", "JUN", "aug_2018", "jul_2018", "jun_2018"]
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
karthikz@apache.org.