You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/06/15 22:14:37 UTC
[beam] branch master updated: BEAM-10221: Add in four tests cases
of base on the java equivalent for the kotlin cookbook examples.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e7220f BEAM-10221: Add in four tests cases of base on the java equivalent for the kotlin cookbook examples.
new 34dff56 Merge pull request #12000 from BEAM-10221: Add in example tests cases of Kotlin cookbook
1e7220f is described below
commit 1e7220f32d70dd8a311a08cdb0b2beaba75ba23a
Author: perkss <st...@gmail.com>
AuthorDate: Fri Jun 12 16:34:15 2020 +0100
BEAM-10221: Add in four tests cases of base on the java equivalent for the kotlin cookbook examples.
---
.../kotlin/cookbook/DistinctExampleTest.kt | 60 ++++++++++++
.../examples/kotlin/cookbook/FilterExamplesTest.kt | 83 ++++++++++++++++
.../examples/kotlin/cookbook/JoinExamplesTest.kt | 104 +++++++++++++++++++++
.../kotlin/cookbook/MaxPerKeyExamplesTest.kt | 86 +++++++++++++++++
4 files changed, 333 insertions(+)
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt
new file mode 100644
index 0000000..56702a3
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/DistinctExampleTest.kt
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import org.apache.beam.sdk.coders.StringUtf8Coder
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.Distinct
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [Distinct]. */
+@RunWith(JUnit4::class)
+class DistinctExampleTest {
+
+ private val pipeline: TestPipeline = TestPipeline.create()
+
+ @Rule
+ fun pipeline(): TestPipeline = pipeline
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testDistinct() {
+ val strings = listOf("k1", "k5", "k5", "k2", "k1", "k2", "k3")
+ val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
+ val output = input.apply(Distinct.create())
+ PAssert.that(output).containsInAnyOrder("k1", "k5", "k2", "k3")
+ pipeline.run().waitUntilFinish()
+ }
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testDistinctEmpty() {
+ val strings = listOf<String>()
+ val input = pipeline.apply(Create.of(strings).withCoder(StringUtf8Coder.of()))
+ val output = input.apply(Distinct.create())
+ PAssert.that(output).empty()
+ pipeline.run().waitUntilFinish()
+ }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt
new file mode 100644
index 0000000..d5cb544
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/FilterExamplesTest.kt
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.examples.kotlin.cookbook.FilterExamples.FilterSingleMonthDataFn
+import org.apache.beam.examples.kotlin.cookbook.FilterExamples.ProjectionFn
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [FilterExamples]. */
+@RunWith(JUnit4::class)
+class FilterExamplesTest {
+
+ private val row1 = TableRow()
+ .set("month", "6")
+ .set("day", "21")
+ .set("year", "2014")
+ .set("mean_temp", "85.3")
+ .set("tornado", true)
+ private val row2 = TableRow()
+ .set("month", "7")
+ .set("day", "20")
+ .set("year", "2014")
+ .set("mean_temp", "75.4")
+ .set("tornado", false)
+ private val row3 = TableRow()
+ .set("month", "6")
+ .set("day", "18")
+ .set("year", "2014")
+ .set("mean_temp", "45.3")
+ .set("tornado", true)
+
+ private val outRow1 = TableRow().set("year", 2014).set("month", 6).set("day", 21).set("mean_temp", 85.3)
+ private val outRow2 = TableRow().set("year", 2014).set("month", 7).set("day", 20).set("mean_temp", 75.4)
+ private val outRow3 = TableRow().set("year", 2014).set("month", 6).set("day", 18).set("mean_temp", 45.3)
+
+ private val pipeline: TestPipeline = TestPipeline.create()
+
+ @Rule
+ fun pipeline(): TestPipeline = pipeline
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testProjectionFn() {
+ val input = pipeline.apply(Create.of(row1, row2, row3))
+ val results = input.apply(ParDo.of(ProjectionFn()))
+ PAssert.that(results).containsInAnyOrder(outRow1, outRow2, outRow3)
+ pipeline.run().waitUntilFinish()
+ }
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testFilterSingleMonthDataFn() {
+ val input = pipeline.apply(Create.of(outRow1, outRow2, outRow3))
+ val results = input.apply(ParDo.of(FilterSingleMonthDataFn(7)))
+ PAssert.that(results).containsInAnyOrder(outRow2)
+ pipeline.run().waitUntilFinish()
+ }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt
new file mode 100644
index 0000000..8728a82
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/JoinExamplesTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractCountryInfoFn
+import org.apache.beam.examples.kotlin.cookbook.JoinExamples.ExtractEventDataFn
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [JoinExamples]. */
+@RunWith(JUnit4::class)
+class JoinExamplesTest {
+
+ companion object {
+ private val row1 = TableRow()
+ .set("ActionGeo_CountryCode", "VM")
+ .set("SQLDATE", "20141212")
+ .set("Actor1Name", "BANGKOK")
+ .set("SOURCEURL", "http://cnn.com")
+ private val row2 = TableRow()
+ .set("ActionGeo_CountryCode", "VM")
+ .set("SQLDATE", "20141212")
+ .set("Actor1Name", "LAOS")
+ .set("SOURCEURL", "http://www.chicagotribune.com")
+ private val row3 = TableRow()
+ .set("ActionGeo_CountryCode", "BE")
+ .set("SQLDATE", "20141213")
+ .set("Actor1Name", "AFGHANISTAN")
+ .set("SOURCEURL", "http://cnn.com")
+ private val EVENTS = arrayOf(row1, row2, row3)
+ val EVENT_ARRAY: List<TableRow> = listOf(*EVENTS)
+ private val PARSED_EVENTS = listOf(
+ KV.of("VM", "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com"),
+ KV.of("BE", "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com"),
+ KV.of("VM", "Date: 20141212, Actor1: BANGKOK, url: http://cnn.com"))
+ private val PARSED_COUNTRY_CODES = listOf(KV.of("BE", "Belgium"), KV.of("VM", "Vietnam"))
+ private val cc1 = TableRow().set("FIPSCC", "VM").set("HumanName", "Vietnam")
+ private val cc2 = TableRow().set("FIPSCC", "BE").set("HumanName", "Belgium")
+ private val CCS = arrayOf(cc1, cc2)
+ val CC_ARRAY = listOf(*CCS)
+ val JOINED_EVENTS = arrayOf(
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+ + "url: http://www.chicagotribune.com",
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+ + "url: http://cnn.com",
+ "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+ + "url: http://cnn.com")
+ }
+
+ private val pipeline: TestPipeline = TestPipeline.create()
+
+ @Rule
+ fun pipeline(): TestPipeline = pipeline
+
+ @Test
+ fun testExtractEventDataFn() {
+ val output = pipeline.apply(Create.of(EVENT_ARRAY)).apply(ParDo.of(ExtractEventDataFn()))
+ PAssert.that(output).containsInAnyOrder(PARSED_EVENTS)
+ pipeline.run().waitUntilFinish()
+ }
+
+ @Test
+ fun testExtractCountryInfoFn() {
+ val output = pipeline.apply(Create.of(CC_ARRAY)).apply(ParDo.of(ExtractCountryInfoFn()))
+ PAssert.that(output).containsInAnyOrder(PARSED_COUNTRY_CODES)
+ pipeline.run().waitUntilFinish()
+ }
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testJoin() {
+ val input1 = pipeline.apply("CreateEvent", Create.of(EVENT_ARRAY))
+ val input2 = pipeline.apply("CreateCC", Create.of(CC_ARRAY))
+ val output: PCollection<String> = JoinExamples.joinEvents(input1, input2)
+ PAssert.that(output).containsInAnyOrder(*JOINED_EVENTS)
+ pipeline.run().waitUntilFinish()
+ }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt
new file mode 100644
index 0000000..c9f4ec3
--- /dev/null
+++ b/examples/kotlin/src/test/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamplesTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.ValidatesRunner
+import org.apache.beam.sdk.transforms.Create
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+
+/** Unit tests for [MaxPerKeyExamples]. */
+@RunWith(JUnit4::class)
+class MaxPerKeyExamplesTest {
+
+ private val row1 = TableRow()
+ .set("month", "6")
+ .set("day", "21")
+ .set("year", "2014")
+ .set("mean_temp", "85.3")
+ .set("tornado", true)
+ private val row2 = TableRow()
+ .set("month", "7")
+ .set("day", "20")
+ .set("year", "2014")
+ .set("mean_temp", "75.4")
+ .set("tornado", false)
+ private val row3 = TableRow()
+ .set("month", "6")
+ .set("day", "18")
+ .set("year", "2014")
+ .set("mean_temp", "45.3")
+ .set("tornado", true)
+
+ private val testRows: List<TableRow> = listOf(row1, row2, row3)
+ private val kv1 = KV.of(6, 85.3)
+ private val kv2 = KV.of(6, 45.3)
+ private val kv3 = KV.of(7, 75.4)
+ private val testKvs: List<KV<Int, Double>> = listOf(kv1, kv2, kv3)
+ private val resultRow1 = TableRow().set("month", 6).set("max_mean_temp", 85.3)
+ private val resultRow2 = TableRow().set("month", 6).set("max_mean_temp", 45.3)
+ private val resultRow3 = TableRow().set("month", 7).set("max_mean_temp", 75.4)
+
+ private val pipeline: TestPipeline = TestPipeline.create()
+
+ @Rule
+ fun pipeline(): TestPipeline = pipeline
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testExtractTempFn() {
+ val results = pipeline.apply(Create.of(testRows)).apply(ParDo.of<TableRow, KV<Int, Double>>(MaxPerKeyExamples.ExtractTempFn()))
+ PAssert.that(results).containsInAnyOrder(ImmutableList.of(kv1, kv2, kv3))
+ pipeline.run().waitUntilFinish()
+ }
+
+ @Test
+ @Category(ValidatesRunner::class)
+ fun testFormatMaxesFn() {
+ val results = pipeline.apply(Create.of(testKvs)).apply(ParDo.of<KV<Int, Double>, TableRow>(MaxPerKeyExamples.FormatMaxesFn()))
+ PAssert.that(results).containsInAnyOrder(resultRow1, resultRow2, resultRow3)
+ pipeline.run().waitUntilFinish()
+ }
+}
\ No newline at end of file