You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/11/26 22:50:30 UTC

[GitHub] nwangtw closed pull request #3121: [Scala Streamlet API] Add Integration Test for Source and Union Operations

nwangtw closed pull request #3121: [Scala Streamlet API] Add Integration Test for Source and Union Operations
URL: https://github.com/apache/incubator-heron/pull/3121
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
index a6812218c3..b2fee80367 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBiFunction.java
@@ -25,7 +25,7 @@
 
 /**
  * All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
  * Functions as their input. We simply decorate java.util. function
  * definitions with a Serializable tag to ensure that any supplied
  * lambda functions automatically become serializable.
diff --git a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
index 4441d4922e..c8586f5fff 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/SerializableBinaryOperator.java
@@ -22,7 +22,7 @@
 
 /**
  * All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
  * Functions as their input. We simply decorate java.util. function
  * definitions with a Serializable tag to ensure that any supplied
  * lambda functions automatically become serializable.
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 62cc92f28a..bb15dcea1d 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -22,6 +22,11 @@
       "topologyName": "IntegrationTest_ScalaStreamletWithSplitAndWithStream",
       "classPath": "scala_streamlet_with_split_and_with_stream.ScalaStreamletWithSplitAndWithStream",
       "expectedResultRelativePath": "scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFilterAndUnion",
+      "classPath"    : "scala_streamlet_with_map_and_filter_and_union.ScalaStreamletWithMapAndFilterAndUnion",
+      "expectedResultRelativePath" : "scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json"
     }
   ],
   "javaTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
new file mode 100644
index 0000000000..29388cc7ea
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ClassicalMusicDataset.scala
@@ -0,0 +1,52 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.heron.integration_test.common
+
+case class ClassicalMusic(composer: String,
+                          title: String,
+                          year: Int,
+                          keyword: String)
+
+/**
+  * Common Dataset to be used by Scala Streamlet Integration Tests
+  */
+object ClassicalMusicDataset {
+
+  val firstClassicalMusicList = List(
+    ClassicalMusic("Bach", "Bourrée In E Minor", 1717, "guitar"),
+    ClassicalMusic("Vivaldi", "Four Seasons: Winter", 1723, "rousing"),
+    ClassicalMusic("Bach", "Air On The G String", 1723, "light"),
+    ClassicalMusic("Mozart", "Symphony No. 40: I", 1788, "seductive"),
+    ClassicalMusic("Beethoven", "Symphony No. 9: Ode To Joy", 1824, "joyful"),
+    ClassicalMusic("Bizet", "Carmen: Habanera", 1875, "seductive")
+  )
+
+  val secondClassicalMusicList = List(
+    ClassicalMusic("Handel", "Water Music: Alla Hornpipe", 1717, "formal"),
+    ClassicalMusic("Vivaldi", "Four Seasons: Spring", 1723, "formal"),
+    ClassicalMusic("Bach",
+                   "Cantata 147: Jesu, Joy Of Man's Desiring",
+                   1723,
+                   "wedding"),
+    ClassicalMusic("Mozart", "Piano Sonata No. 16", 1788, "piano"),
+    ClassicalMusic("Beethoven", "Symphony No. 9: II", 1824, "powerful"),
+    ClassicalMusic("Tchaikovsky", "Piano Concerto No. 1", 1875, "piano")
+  )
+
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
index d68180bda9..0b9f85ac5e 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.common
 
 import org.apache.heron.integration_test.core.TestTopologyBuilder
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
index 3e73f2a8d4..b2ccbae921 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
 
 import java.util.concurrent.atomic.AtomicInteger
@@ -28,9 +28,7 @@ import org.apache.heron.integration_test.common.{
   AbstractTestTopology,
   ScalaIntegrationTestBase
 }
-import org.apache.heron.streamlet.scala.{
-  Builder, SerializableTransformer
-}
+import org.apache.heron.streamlet.scala.{Builder, SerializableTransformer}
 
 object ScalaStreamletWithFilterAndTransform {
   def main(args: Array[String]): Unit = {
@@ -41,7 +39,7 @@ object ScalaStreamletWithFilterAndTransform {
 }
 
 /**
-  * Scala Streamlet Integration Test
+  * Scala Streamlet Integration Test by covering source, filter, transform operations.
   */
 class ScalaStreamletWithFilterAndTransform(args: Array[String])
     extends AbstractTestTopology(args)
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
new file mode 100644
index 0000000000..9cfe6a1731
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnion.scala
@@ -0,0 +1,102 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_filter_and_union
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+  AbstractTestTopology,
+  ClassicalMusic,
+  ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.common.ClassicalMusicDataset._
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.Context
+import org.apache.heron.streamlet.scala.{Builder, Source}
+
+import scala.collection.mutable.Set
+
+object ScalaStreamletWithMapAndFilterAndUnion {
+
+  val filterSet = Set[String]()
+
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithMapAndFilterAndUnion(args)
+    topology.submit(conf)
+  }
+
+}
+
+/**
+  * Scala Streamlet Integration Test by covering source, map, filter and union operations.
+  */
+class ScalaStreamletWithMapAndFilterAndUnion(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  import ScalaStreamletWithMapAndFilterAndUnion._
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val streamletBuilder = Builder.newBuilder
+    val classicalMusics1 =
+      streamletBuilder
+        .newSource(new ClassicalMusicSource(firstClassicalMusicList))
+        .setName("classical-musics")
+        .map(
+          classicalMusic =>
+            new ClassicalMusic(classicalMusic.composer.toUpperCase(),
+                               classicalMusic.title.toUpperCase(),
+                               classicalMusic.year,
+                               classicalMusic.keyword.toUpperCase()))
+        .setName("classical-musics-with-uppercase")
+
+    val classicalMusics2 = streamletBuilder
+      .newSource(new ClassicalMusicSource(secondClassicalMusicList))
+      .setName("classical-musics-2")
+
+    val unionStreamlet = classicalMusics1
+      .union(classicalMusics2)
+      .setName("classical-musics-union")
+
+    unionStreamlet
+      .map[String](classicalMusic =>
+        s"${classicalMusic.composer}-${classicalMusic.year}")
+      .setName("classical-musics-with-composer-and-year")
+      .filter(filterSet.add(_))
+      .setName("filtered-classical-musics")
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+
+}
+
+private class ClassicalMusicSource(classicalMusics: List[ClassicalMusic])
+    extends Source[ClassicalMusic] {
+
+  var list = List[ClassicalMusic]()
+
+  override def setup(context: Context): Unit = {
+    list = classicalMusics
+  }
+
+  override def get(): Iterable[ClassicalMusic] = list
+
+  override def cleanup(): Unit = ???
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
new file mode 100644
index 0000000000..0eab457952
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_filter_and_union/ScalaStreamletWithMapAndFilterAndUnionResults.json
@@ -0,0 +1 @@
+["BACH-1717", "BACH-1723", "BEETHOVEN-1824", "BIZET-1875", "Bach-1723", "Beethoven-1824", "Handel-1717", "MOZART-1788", "Mozart-1788", "Tchaikovsky-1875", "VIVALDI-1723", "Vivaldi-1723"]
\ No newline at end of file
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
index bb77987d70..884ba1f4a7 100644
--- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
@@ -1,21 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone
 
 import scala.collection.mutable.Set
@@ -44,6 +44,9 @@ object ScalaStreamletWithMapAndFlatMapAndFilterAndClone {
   }
 }
 
+/**
+  * Scala Streamlet Integration Test by covering source, map, flatMap, filter and clone operations.
+  */
 class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
     extends AbstractTestTopology(args)
     with ScalaIntegrationTestBase {
@@ -52,6 +55,7 @@ class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
 
   override protected def buildTopology(
       testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+
     val streamletBuilder = Builder.newBuilder
 
     val clonedStreamlet = streamletBuilder


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services