You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/06/15 22:14:37 UTC

[beam] branch master updated: BEAM-10221: Add in four tests cases of base on the java equivalent for the kotlin cookbook examples.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e7220f  BEAM-10221: Add in four tests cases of base on the java equivalent for the kotlin cookbook examples.
     new 34dff56  Merge pull request #12000 from BEAM-10221: Add in example tests cases of Kotlin cookbook
1e7220f is described below

commit 1e7220f32d70dd8a311a08cdb0b2beaba75ba23a
Author: perkss <st...@gmail.com>
AuthorDate: Fri Jun 12 16:34:15 2020 +0100

    BEAM-10221: Add in four tests cases of base on the java equivalent for the kotlin cookbook examples.
---
 .../kotlin/cookbook/DistinctExampleTest.kt         |  60 ++++++++++++
 .../examples/kotlin/cookbook/FilterExamplesTest.kt |  83 ++++++++++++++++
 .../examples/kotlin/cookbook/JoinExamplesTest.kt   | 104 +++++++++++++++++++++
 .../kotlin/cookbook/MaxPerKeyExamplesTest.kt       |  86 +++++++++++++++++
 4 files changed, 333 insertions(+)

diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt
new file mode 100644
index 0000000..56702a3
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import org.apache.beam.sdk.coders.StringUtf8Coder
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.Distinct
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [Distinct].  */
+@RunWith(JUnit4::class)
+class DistinctExampleTest {
+
+    private val pipeline: TestPipeline = TestPipeline.create()
+
+    @Rule
+    fun pipeline(): TestPipeline = pipeline
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testDistinct() {
+        val strings = listOf("k1", "k5", "k5", "k2", "k1", "k2", "k3")
+        val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
+        val output = input.apply(Distinct.create())
+        PAssert.that(output).containsInAnyOrder("k1", "k5", "k2", "k3")
+        pipeline.run().waitUntilFinish()
+    }
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testDistinctEmpty() {
+        val strings = listOf<String>()
+        val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
+        val output = input.apply(Distinct.create())
+        PAssert.that(output).empty()
+        pipeline.run().waitUntilFinish()
+    }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt
new file mode 100644
index 0000000..d5cb544
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.examples.kotlin.cookbook.FilterExamples.FilterSingleMonthDataFn
+import org.apache.beam.examples.kotlin.cookbook.FilterExamples.ProjectionFn
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [FilterExamples].  */
+@RunWith(JUnit4::class)
+class FilterExamplesTest {
+
+    private val row1 = TableRow()
+            .set("month", "6")
+            .set("day", "21")
+            .set("year", "2014")
+            .set("mean_temp", "85.3")
+            .set("tornado", true)
+    private val row2 = TableRow()
+            .set("month", "7")
+            .set("day", "20")
+            .set("year", "2014")
+            .set("mean_temp", "75.4")
+            .set("tornado", false)
+    private val row3 = TableRow()
+            .set("month", "6")
+            .set("day", "18")
+            .set("year", "2014")
+            .set("mean_temp", "45.3")
+            .set("tornado", true)
+
+    private val outRow1 = TableRow().set("year", 2014).set("month", 6).set("day", 21).set("mean_temp", 85.3)
+    private val outRow2 = TableRow().set("year", 2014).set("month", 7).set("day", 20).set("mean_temp", 75.4)
+    private val outRow3 = TableRow().set("year", 2014).set("month", 6).set("day", 18).set("mean_temp", 45.3)
+
+    private val pipeline: TestPipeline = TestPipeline.create()
+
+    @Rule
+    fun pipeline(): TestPipeline = pipeline
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testProjectionFn() {
+        val input = pipeline.apply(Create.of(row1, row2, row3))
+        val results = input.apply(ParDo.of(ProjectionFn()))
+        PAssert.that(results).containsInAnyOrder(outRow1, outRow2, outRow3)
+        pipeline.run().waitUntilFinish()
+    }
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testFilterSingleMonthDataFn() {
+        val input = pipeline.apply(Create.of(outRow1, outRow2, outRow3))
+        val results = input.apply(ParDo.of(FilterSingleMonthDataFn(7)))
+        PAssert.that(results).containsInAnyOrder(outRow2)
+        pipeline.run().waitUntilFinish()
+    }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt
new file mode 100644
index 0000000..8728a82
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractCountryInfoFn
+import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractEventDataFn
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [JoinExamples].  */
+@RunWith(JUnit4::class)
+class JoinExamplesTest {
+
+    companion object {
+        private val row1 = TableRow()
+                .set("ActionGeo_CountryCode", "VM")
+                .set("SQLDATE", "20141212")
+                .set("Actor1Name", "BANGKOK")
+                .set("SOURCEURL", "http://cnn.com")
+        private val row2 = TableRow()
+                .set("ActionGeo_CountryCode", "VM")
+                .set("SQLDATE", "20141212")
+                .set("Actor1Name", "LAOS")
+                .set("SOURCEURL", "http://www.chicagotribune.com")
+        private val row3 = TableRow()
+                .set("ActionGeo_CountryCode", "BE")
+                .set("SQLDATE", "20141213")
+                .set("Actor1Name", "AFGHANISTAN")
+                .set("SOURCEURL", "http://cnn.com")
+        private val EVENTS = arrayOf(row1, row2, row3)
+        val EVENT_ARRAY: List<TableRow> = listOf(*EVENTS)
+        private val PARSED_EVENTS = listOf(
+                KV.of("VM", "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com"),
+                KV.of("BE", "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com"),
+                KV.of("VM", "Date: 20141212, Actor1: BANGKOK, url: http://cnn.com"))
+        private val PARSED_COUNTRY_CODES = listOf(KV.of("BE", "Belgium"), KV.of("VM", "Vietnam"))
+        private val cc1 = TableRow().set("FIPSCC", "VM").set("HumanName", "Vietnam")
+        private val cc2 = TableRow().set("FIPSCC", "BE").set("HumanName", "Belgium")
+        private val CCS = arrayOf(cc1, cc2)
+        val CC_ARRAY = listOf(*CCS)
+        val JOINED_EVENTS = arrayOf(
+                "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+                        + "url: http://www.chicagotribune.com",
+                "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+                        + "url: http://cnn.com",
+                "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+                        + "url: http://cnn.com")
+    }
+
+    private val pipeline: TestPipeline = TestPipeline.create()
+
+    @Rule
+    fun pipeline(): TestPipeline = pipeline
+
+    @Test
+    fun testExtractEventDataFn() {
+        val output = pipeline.apply(Create.of(EVENT_ARRAY)).apply(ParDo.of(ExtractEventDataFn()))
+        PAssert.that(output).containsInAnyOrder(PARSED_EVENTS)
+        pipeline.run().waitUntilFinish()
+    }
+
+    @Test
+    fun testExtractCountryInfoFn() {
+        val output = pipeline.apply(Create.of(CC_ARRAY)).apply(ParDo.of(ExtractCountryInfoFn()))
+        PAssert.that(output).containsInAnyOrder(PARSED_COUNTRY_CODES)
+        pipeline.run().waitUntilFinish()
+    }
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testJoin() {
+        val input1 = pipeline.apply("CreateEvent", Create.of(EVENT_ARRAY))
+        val input2 = pipeline.apply("CreateCC", Create.of(CC_ARRAY))
+        val output: PCollection<String> = JoinExamples.joinEvents(input1, input2)
+        PAssert.that(output).containsInAnyOrder(*JOINED_EVENTS)
+        pipeline.run().waitUntilFinish()
+    }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt
new file mode 100644
index 0000000..c9f4ec3
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [MaxPerKeyExamples].  */
+@RunWith(JUnit4::class)
+class MaxPerKeyExamplesTest {
+
+    private val row1 = TableRow()
+            .set("month", "6")
+            .set("day", "21")
+            .set("year", "2014")
+            .set("mean_temp", "85.3")
+            .set("tornado", true)
+    private val row2 = TableRow()
+            .set("month", "7")
+            .set("day", "20")
+            .set("year", "2014")
+            .set("mean_temp", "75.4")
+            .set("tornado", false)
+    private val row3 = TableRow()
+            .set("month", "6")
+            .set("day", "18")
+            .set("year", "2014")
+            .set("mean_temp", "45.3")
+            .set("tornado", true)
+
+    private val testRows: List<TableRow> = listOf(row1, row2, row3)
+    private val kv1 = KV.of(6, 85.3)
+    private val kv2 = KV.of(6, 45.3)
+    private val kv3 = KV.of(7, 75.4)
+    private val testKvs: List<KV<Int, Double>> = listOf(kv1, kv2, kv3)
+    private val resultRow1 = TableRow().set("month", 6).set("max_mean_temp", 85.3)
+    private val resultRow2 = TableRow().set("month", 6).set("max_mean_temp", 45.3)
+    private val resultRow3 = TableRow().set("month", 7).set("max_mean_temp", 75.4)
+
+    private val pipeline: TestPipeline = TestPipeline.create()
+
+    @Rule
+    fun pipeline(): TestPipeline = pipeline
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testExtractTempFn() {
+        val results = pipeline.apply(Create.of(testRows)).apply(ParDo.of<TableRow, KV<Int, Double>>(MaxPerKeyExamples.ExtractTempFn()))
+        PAssert.that(results).containsInAnyOrder(ImmutableList.of(kv1, kv2, kv3))
+        pipeline.run().waitUntilFinish()
+    }
+
+    @Test
+    @Category(ValidatesRunner::class)
+    fun testFormatMaxesFn() {
+        val results = pipeline.apply(Create.of(testKvs)).apply(ParDo.of<KV<Int, Double>, TableRow>(MaxPerKeyExamples.FormatMaxesFn()))
+        PAssert.that(results).containsInAnyOrder(resultRow1, resultRow2, resultRow3)
+        pipeline.run().waitUntilFinish()
+    }
+}
\ No newline at end of file