You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/14 20:34:34 UTC
[beam] branch master updated: [BEAM-6493] : Add cookbook and
snippets examples in Kotlin (#8439)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b988307 [BEAM-6493] : Add cookbook and snippets examples in Kotlin (#8439)
b988307 is described below
commit b9883076487334d5abd34e1db42650c095c0b4d3
Author: Harshit Dwivedi <47...@users.noreply.github.com>
AuthorDate: Tue May 14 13:34:20 2019 -0700
[BEAM-6493] : Add cookbook and snippets examples in Kotlin (#8439)
* feat : convert snippets to kotlin:
Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
* add kotlin samples for cookbook
Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
* fix : findbugs errors
Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
* fix : specify type explicitly for failing apply
Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
---
examples/kotlin/OWNERS | 5 -
.../examples/kotlin/cookbook/BigQueryTornadoes.kt | 195 ++++++++
.../kotlin/cookbook/CombinePerKeyExamples.kt | 187 ++++++++
.../examples/kotlin/cookbook/DistinctExample.kt | 94 ++++
.../examples/kotlin/cookbook/FilterExamples.kt | 229 +++++++++
.../beam/examples/kotlin/cookbook/JoinExamples.kt | 174 +++++++
.../examples/kotlin/cookbook/MaxPerKeyExamples.kt | 155 ++++++
.../apache/beam/examples/kotlin/cookbook/README.md | 71 +++
.../examples/kotlin/cookbook/TriggerExample.kt | 524 +++++++++++++++++++++
.../beam/examples/kotlin/snippets/Snippets.kt | 382 +++++++++++++++
10 files changed, 2011 insertions(+), 5 deletions(-)
diff --git a/examples/kotlin/OWNERS b/examples/kotlin/OWNERS
deleted file mode 100644
index 3811246..0000000
--- a/examples/kotlin/OWNERS
+++ /dev/null
@@ -1,5 +0,0 @@
-# See the OWNERS docs at https://s.apache.org/beam-owners
-
-reviewers:
- - lukecwik
- - aaltay
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
new file mode 100644
index 0000000..e4019ba
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, counts the number of
+ * tornadoes that occur in each month, and writes the results to BigQuery.
+ *
+ *
+ * Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output with the form:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations`
+ * and can be overridden with `--input`.
+ */
+object BigQueryTornadoes {
+ // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+ private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+
+ /**
+ * Examines each row in the input table. If a tornado was recorded in that sample, the month in
+ * which it occurred is output.
+ */
+ internal class ExtractTornadoesFn : DoFn<TableRow, Int>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ if (row["tornado"] as Boolean) {
+ c.output(Integer.parseInt(row["month"] as String))
+ }
+ }
+ }
+
+ /**
+ * Prepares the data for writing to BigQuery by building a TableRow object containing an integer
+ * representation of month and the number of tornadoes that occurred in each month.
+ */
+ internal class FormatCountsFn : DoFn<KV<Int, Long>, TableRow>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = TableRow()
+ .set("month", c.element().getKey())
+ .set("tornado_count", c.element().getValue())
+ c.output(row)
+ }
+ }
+
+ /**
+ * Takes rows from a table and generates a table of counts.
+ *
+ *
+ * The input schema is described by https://developers.google.com/bigquery/docs/dataset-gsod .
+ * The output contains the total number of tornadoes found in each month in the following schema:
+ *
+ *
+ * * month: integer
+ * * tornado_count: integer
+ *
+ */
+ internal class CountTornadoes : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+ override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+ // row... => month...
+ val tornadoes = rows.apply(ParDo.of(ExtractTornadoesFn()))
+
+ // month... => <month,count>...
+ val tornadoCounts = tornadoes.apply(Count.perElement())
+
+ // <month,count>... => row...
+
+ return tornadoCounts.apply(ParDo.of(FormatCountsFn()))
+ }
+ }
+
+ /**
+ * Options supported by [BigQueryTornadoes].
+ *
+ *
+ * Inherits standard configuration options.
+ */
+ interface Options : PipelineOptions {
+ @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+ @get:Default.String(WEATHER_SAMPLES_TABLE)
+ var input: String
+
+ @get:Description("Mode to use when reading from BigQuery")
+ @get:Default.Enum("EXPORT")
+ var readMethod: Method
+
+ @get:Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
+ @get:Validation.Required
+ var output: String
+ }
+
+ private fun runBigQueryTornadoes(options: Options) {
+ val p = Pipeline.create(options)
+
+ // Build the table schema for the output table.
+ val fields = arrayListOf<TableFieldSchema>(
+ TableFieldSchema().setName("month").setType("INTEGER"),
+ TableFieldSchema().setName("tornado_count").setType("INTEGER")
+ )
+
+ val schema = TableSchema().setFields(fields)
+
+ val rowsFromBigQuery: PCollection<TableRow>
+
+ if (options.readMethod == Method.DIRECT_READ) {
+ // Build the read options proto for the read operation.
+ val tableReadOptions = TableReadOptions.newBuilder()
+ .addAllSelectedFields(Lists.newArrayList("month", "tornado"))
+ .build()
+
+ rowsFromBigQuery = p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.input)
+ .withMethod(Method.DIRECT_READ)
+ .withReadOptions(tableReadOptions))
+ } else {
+ rowsFromBigQuery = p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.input)
+ .withMethod(options.readMethod))
+ }
+
+ rowsFromBigQuery
+ .apply(CountTornadoes())
+ .apply<WriteResult>(
+ BigQueryIO.writeTableRows()
+ .to(options.output)
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+ p.run().waitUntilFinish()
+ }
+
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+
+ runBigQueryTornadoes(options)
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
new file mode 100644
index 0000000..9e38803
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in the dataset that is
+ * over a given length, generates a string containing the list of play names in which that word
+ * appears, and saves this information to a bigquery table.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `publicdata:samples.shakespeare` and can be
+ * overridden with `--input`.
+ */
+object CombinePerKeyExamples {
+ // Use the shakespeare public BigQuery sample
+ private const val SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"
+ // We'll track words >= this word length across all plays in the table.
+ private const val MIN_WORD_LENGTH = 9
+
+ /**
+ * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+ * outputs word, play_name.
+ */
+ internal class ExtractLargeWordsFn : DoFn<TableRow, KV<String, String>>() {
+ private val smallerWords = Metrics.counter(ExtractLargeWordsFn::class.java, "smallerWords")
+
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val playName = row["corpus"] as String
+ val word = row["word"] as String
+ if (word.length >= MIN_WORD_LENGTH) {
+ c.output(KV.of(word, playName))
+ } else {
+ // Track how many smaller words we're not including. This information will be
+ // visible in the Monitoring UI.
+ smallerWords.inc()
+ }
+ }
+ }
+
+ /**
+ * Prepares the data for writing to BigQuery by building a TableRow object containing a word with
+ * a string listing the plays in which it appeared.
+ */
+ internal class FormatShakespeareOutputFn : DoFn<KV<String, String>, TableRow>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = TableRow().set("word", c.element().key).set("all_plays", c.element().value)
+ c.output(row)
+ }
+ }
+
+ /**
+ * Reads the public 'Shakespeare' data, and for each word in the dataset over a given length,
+ * generates a string containing the list of play names in which that word appears. It does this
+ * via the Combine.perKey transform, with the ConcatWords combine function.
+ *
+ *
+ * Combine.perKey is similar to a GroupByKey followed by a ParDo, but has more restricted
+ * semantics that allow it to be executed more efficiently. These records are then formatted as BQ
+ * table rows.
+ */
+ internal class PlaysForWord : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+ override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+ // row... => <word, play_name> ...
+ val words = rows.apply(ParDo.of(ExtractLargeWordsFn()))
+
+ // word, play_name => word, all_plays ...
+ val wordAllPlays = words.apply<PCollection<KV<String, String>>>(Combine.perKey(ConcatWords()))
+
+ // <word, all_plays>... => row...
+
+ return wordAllPlays.apply(ParDo.of(FormatShakespeareOutputFn()))
+ }
+ }
+
+ /**
+ * A 'combine function' used with the Combine.perKey transform. Builds a comma-separated string of
+ * all input items. So, it will build a string containing all the different Shakespeare plays in
+ * which the given input word has appeared.
+ */
+ class ConcatWords : SerializableFunction<Iterable<String>, String> {
+ override fun apply(input: Iterable<String>): String {
+ val all = StringBuilder()
+ for (item in input) {
+ if (item.isNotEmpty()) {
+ if (all.isEmpty()) {
+ all.append(item)
+ } else {
+ all.append(",")
+ all.append(item)
+ }
+ }
+ }
+ return all.toString()
+ }
+ }
+
+ /**
+ * Options supported by [CombinePerKeyExamples].
+ *
+ *
+ * Inherits standard configuration options.
+ */
+ interface Options : PipelineOptions {
+ @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+ @get:Default.String(SHAKESPEARE_TABLE)
+ var input: String
+
+ @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset_id must already exist")
+ @get:Validation.Required
+ var output: String
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+ val p = Pipeline.create(options)
+
+ // Build the table schema for the output table.
+ val fields = arrayListOf<TableFieldSchema>(
+ TableFieldSchema().setName("word").setType("STRING"),
+ TableFieldSchema().setName("all_plays").setType("STRING")
+ )
+ val schema = TableSchema().setFields(fields)
+
+ p.apply(BigQueryIO.readTableRows().from(options.input))
+ .apply(PlaysForWord())
+ .apply<WriteResult>(
+ BigQueryIO.writeTableRows()
+ .to(options.output)
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+ p.run().waitUntilFinish()
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt
new file mode 100644
index 0000000..f3ea070
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.Distinct
+import org.apache.beam.sdk.values.PCollection
+
+/**
+ * This example uses as input Shakespeare's plays as plaintext files, and will remove any duplicate
+ * lines across all the files. (The output does not preserve any input order).
+ *
+ * <p>Concepts: the Distinct transform, and how to wire transforms together. Demonstrates {@link
+ * org.apache.beam.sdk.io.TextIO.Read}/ {@link Distinct}/{@link
+ * org.apache.beam.sdk.io.TextIO.Write}.
+ *
+ * <p>To execute this pipeline locally, specify a local output file or output prefix on GCS:
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ *
+ * <p>To change the runner, specify:
+ *
+ * <pre>{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }</pre>
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ * <p>The input defaults to {@code gs://apache-beam-samples/shakespeare/\*} and can be overridden
+ * with {@code --input}.
+ *
+ */
+
+object DistinctExample {
+
+ /**
+ * Options supported by [DistinctExample].
+ *
+ *
+ * Inherits standard configuration options.
+ */
+
+ interface Options : PipelineOptions {
+ @get:Description("Path to the directory or GCS prefix containing files to read from")
+ @get:Default.String("gs://apache-beam-samples/shakespeare/*")
+ var input: String
+
+ @get:Description("Path of the file to write to")
+ @get:Default.InstanceFactory(OutputFactory::class)
+ var output: String
+
+ /** Returns gs://${TEMP_LOCATION}/"deduped.txt". */
+ class OutputFactory : DefaultValueFactory<String> {
+ override fun create(options: PipelineOptions): String {
+ options.tempLocation?.let {
+ return GcsPath.fromUri(it).resolve("deduped.txt").toString()
+ } ?: run {
+ throw IllegalArgumentException("Must specify --output or --tempLocation")
+ }
+ }
+ }
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+ val p = Pipeline.create(options)
+
+ p.apply<PCollection<String>>("ReadLines", TextIO.read().from(options.input))
+ .apply(Distinct.create<String>())
+ .apply("DedupedShakespeare", TextIO.write().to(options.output))
+
+ p.run().waitUntilFinish()
+ }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
new file mode 100644
index 0000000..7084726
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PCollectionView
+
+import java.util.ArrayList
+import java.util.logging.Logger
+
+/**
+ * This is an example that demonstrates several approaches to filtering, and use of the Mean
+ * transform. It shows how to dynamically set parameters by defining and using new pipeline options,
+ * and how to use a value derived by the pipeline.
+ *
+ *
+ * Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
+ * input; approaches to filtering, selection, and projection.
+ *
+ *
+ * The example reads public samples of weather data from BigQuery. It performs a projection on
+ * the data, finds the global mean of the temperature readings, filters on readings for a single
+ * given month, and then outputs only data (for that month) that has a mean temp smaller than the
+ * derived global mean.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * [--monthFilter=<month_number>]
+`</pre> *
+ *
+ * where optional parameter `--monthFilter` is set to a number 1-12.
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations`
+ * and can be overridden with `--input`.
+ */
+object FilterExamples {
+ // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+ private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+ internal val LOG = Logger.getLogger(FilterExamples::class.java.name)
+ internal const val MONTH_TO_FILTER = 7
+
+ /**
+ * Examines each row in the input table. Outputs only the subset of the cells this example is
+ * interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
+ */
+ internal class ProjectionFn : DoFn<TableRow, TableRow>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ // Grab year, month, day, mean_temp from the row
+ val year = Integer.parseInt(row["year"] as String)
+ val month = Integer.parseInt(row["month"] as String)
+ val day = Integer.parseInt(row["day"] as String)
+ val meanTemp = row["mean_temp"].toString().toDouble()
+
+ // Prepares the data for writing to BigQuery by building a TableRow object
+ val outRow = TableRow()
+ .set("year", year)
+ .set("month", month)
+ .set("day", day)
+ .set("mean_temp", meanTemp)
+ c.output(outRow)
+ }
+ }
+
+ /**
+ * Implements 'filter' functionality.
+ *
+ *
+ * Examines each row in the input table. Outputs only rows from the month monthFilter, which is
+ * passed in as a parameter during construction of this DoFn.
+ */
+ internal class FilterSingleMonthDataFn(var monthFilter: Int?) : DoFn<TableRow, TableRow>() {
+
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val month = row["month"]
+ if (month == this.monthFilter) {
+ c.output(row)
+ }
+ }
+ }
+
+ /**
+ * Examines each row (weather reading) in the input table. Output the temperature reading for that
+ * row ('mean_temp').
+ */
+ internal class ExtractTempFn : DoFn<TableRow, Double>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
+ c.output(meanTemp)
+ }
+ }
+
+ /**
+ * Finds the global mean of the mean_temp for each day/record, and outputs only data that has a
+ * mean temp larger than this global mean.
+ */
+ internal class BelowGlobalMean(var monthFilter: Int?) : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+
+ override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+ // Extract the mean_temp from each row.
+ val meanTemps = rows.apply(ParDo.of(ExtractTempFn()))
+
+ // Find the global mean, of all the mean_temp readings in the weather data,
+ // and prepare this singleton PCollectionView for use as a side input.
+ val globalMeanTemp = meanTemps.apply(Mean.globally()).apply(View.asSingleton())
+
+ // Rows filtered to remove all but a single month
+ val monthFilteredRows = rows.apply(ParDo.of(FilterSingleMonthDataFn(monthFilter)))
+
+ // Then, use the global mean as a side input, to further filter the weather data.
+ // By using a side input to pass in the filtering criteria, we can use a value
+ // that is computed earlier in pipeline execution.
+ // We'll only output readings with temperatures below this mean.
+
+ return monthFilteredRows.apply(
+ "ParseAndFilter",
+ ParDo.of(
+ object : DoFn<TableRow, TableRow>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val meanTemp = java.lang.Double.parseDouble(c.element()["mean_temp"].toString())
+ val gTemp = c.sideInput(globalMeanTemp)
+ if (meanTemp < gTemp) {
+ c.output(c.element())
+ }
+ }
+ })
+ .withSideInputs(globalMeanTemp))
+ }
+ }
+
+ /**
+ * Options supported by [FilterExamples].
+ *
+ *
+ * Inherits standard configuration options.
+ */
+ interface Options : PipelineOptions {
+ @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+ @get:Default.String(WEATHER_SAMPLES_TABLE)
+ var input: String
+
+ @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset_id must already exist")
+ @get:Validation.Required
+ var output: String
+
+ @get:Description("Numeric value of month to filter on")
+ @get:Default.Integer(MONTH_TO_FILTER)
+ var monthFilter: Int?
+ }
+
+ /** Helper method to build the table schema for the output table. */
+ private fun buildWeatherSchemaProjection(): TableSchema {
+ val fields = arrayListOf<TableFieldSchema>(
+ TableFieldSchema().setName("year").setType("INTEGER"),
+ TableFieldSchema().setName("month").setType("INTEGER"),
+ TableFieldSchema().setName("day").setType("INTEGER"),
+ TableFieldSchema().setName("mean_temp").setType("FLOAT")
+ )
+ return TableSchema().setFields(fields)
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+ val p = Pipeline.create(options)
+
+ val schema = buildWeatherSchemaProjection()
+
+ p.apply(BigQueryIO.readTableRows().from(options.input))
+ .apply(ParDo.of(ProjectionFn()))
+ .apply(BelowGlobalMean(options.monthFilter))
+ .apply<WriteResult>(
+ BigQueryIO.writeTableRows()
+ .to(options.output)
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+ p.run().waitUntilFinish()
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
new file mode 100644
index 0000000..3b7f3c4
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.Validation
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.join.CoGbkResult
+import org.apache.beam.sdk.transforms.join.CoGroupByKey
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+import org.apache.beam.sdk.values.TupleTag
+
+/**
+ * This example shows how to do a join on two collections. It uses a sample of the GDELT 'world
+ * event' data (http://goo.gl/OB6oin), joining the event 'action' country code against a table that
+ * maps country codes to country names.
+ *
+ *
+ * Concepts: Join operation; multiple input sources.
+ *
+ *
+ * To execute this pipeline locally, specify a local output file or output prefix on GCS:
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure different runners.
+ */
+object JoinExamples {
+
+ // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+ private const val GDELT_EVENTS_TABLE = "clouddataflow-readonly:samples.gdelt_sample"
+ // A table that maps country codes to country names.
+ private const val COUNTRY_CODES = "gdelt-bq:full.crosswalk_geocountrycodetohuman"
+
+ /** Join two collections, using country code as the key. */
+ @Throws(Exception::class)
+ internal fun joinEvents(
+ eventsTable: PCollection<TableRow>, countryCodes: PCollection<TableRow>): PCollection<String> {
+
+ val eventInfoTag = TupleTag<String>()
+ val countryInfoTag = TupleTag<String>()
+
+ // transform both input collections to tuple collections, where the keys are country
+ // codes in both cases.
+ val eventInfo = eventsTable.apply(ParDo.of(ExtractEventDataFn()))
+ val countryInfo = countryCodes.apply(ParDo.of(ExtractCountryInfoFn()))
+
+ // country code 'key' -> CGBKR (<event info>, <country name>)
+ val kvpCollection = KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
+ .and(countryInfoTag, countryInfo)
+ .apply(CoGroupByKey.create())
+
+ // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+ // country code 'key' -> string of <event info>, <country name>
+ val finalResultCollection = kvpCollection.apply(
+ "Process",
+ ParDo.of(
+ object : DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val e = c.element()
+ val countryCode = e.key
+ val countryName = e.value.getOnly(countryInfoTag)
+ for (ei in c.element().value.getAll(eventInfoTag)) {
+ // Generate a string that combines information from both collection values
+ c.output(
+ KV.of<String, String>(
+ countryCode,
+ "Country name: $countryName, Event info: $ei"))
+ }
+ }
+ }))
+
+ // write to GCS
+ return finalResultCollection.apply(
+ "Format",
+ ParDo.of(
+ object : DoFn<KV<String, String>, String>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val outputString = "Country code: ${c.element().key}, ${c.element().value}"
+ c.output(outputString)
+ }
+ }))
+ }
+
+ /**
+ * Examines each row (event) in the input table. Output a KV with the key the country code of the
+ * event, and the value a string encoding event information.
+ */
+ internal class ExtractEventDataFn : DoFn<TableRow, KV<String, String>>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val countryCode = row["ActionGeo_CountryCode"] as String
+ val sqlDate = row["SQLDATE"] as String
+ val actor1Name = row["Actor1Name"] as String
+ val sourceUrl = row["SOURCEURL"] as String
+ val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl"
+ c.output(KV.of(countryCode, eventInfo))
+ }
+ }
+
+ /**
+ * Examines each row (country info) in the input table. Output a KV with the key the country code,
+ * and the value the country name.
+ */
+ internal class ExtractCountryInfoFn : DoFn<TableRow, KV<String, String>>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val countryCode = row["FIPSCC"] as String
+ val countryName = row["HumanName"] as String
+ c.output(KV.of(countryCode, countryName))
+ }
+ }
+
+ /**
+ * Options supported by [JoinExamples].
+ *
+ * Inherits standard configuration options.
+ */
+ interface Options : PipelineOptions {
+ @get:Description("Path of the file to write to")
+ @get:Validation.Required
+ var output: String
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+ val p = Pipeline.create(options)
+ // the following two 'apply' create multiple inputs to our pipeline, one for each
+ // of our two input sources.
+ val eventsTable = p.apply(BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE))
+ val countryCodes = p.apply(BigQueryIO.readTableRows().from(COUNTRY_CODES))
+ val formattedResults = joinEvents(eventsTable, countryCodes)
+ formattedResults.apply<PDone>(TextIO.write().to(options.output))
+ p.run().waitUntilFinish()
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
new file mode 100644
index 0000000..74d392d
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.examples.kotlin.cookbook.MaxPerKeyExamples.FormatMaxesFn
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.Max
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+
+import java.util.ArrayList
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, and finds the maximum
+ * temperature ('mean_temp') for each month.
+ *
+ *
+ * Concepts: The 'Max' statistical combination function, and how to find the max per key group.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output with the form:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations `
+ * and can be overridden with `--input`.
+ */
+object MaxPerKeyExamples {
+ // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+ private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+
+ /**
+ * Examines each row (weather reading) in the input table. Output the month of the reading, and
+ * the mean_temp.
+ */
+ internal class ExtractTempFn : DoFn<TableRow, KV<Int, Double>>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
+ val month = Integer.parseInt(row["month"] as String)
+ val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
+ c.output(KV.of(month, meanTemp))
+ }
+ }
+
+ /** Format the results to a TableRow, to save to BigQuery. */
+ internal class FormatMaxesFn : DoFn<KV<Int, Double>, TableRow>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val row = TableRow()
+ .set("month", c.element().key)
+ .set("max_mean_temp", c.element().value)
+ c.output(row)
+ }
+ }
+
+ /**
+ * Reads rows from a weather data table, and finds the max mean_temp for each month via the 'Max'
+ * statistical combination function.
+ */
+ internal class MaxMeanTemp : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+ override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+ // row... => <month, mean_temp> ...
+ val temps = rows.apply(ParDo.of(ExtractTempFn()))
+
+ // month, mean_temp... => <month, max mean temp>...
+ val tempMaxes = temps.apply(Max.doublesPerKey())
+
+ // <month, max>... => row...
+
+ return tempMaxes.apply(ParDo.of(FormatMaxesFn()))
+ }
+ }
+
+ /**
+ * Options supported by [MaxPerKeyExamples].
+ *
+ *
+ * Inherits standard configuration options.
+ */
+ interface Options : PipelineOptions {
+ @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+ @get:Default.String(WEATHER_SAMPLES_TABLE)
+ var input: String
+
+ @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>")
+ @get:Validation.Required
+ var output: String
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+ val p = Pipeline.create(options)
+
+ // Build the table schema for the output table.
+ val fields = arrayListOf<TableFieldSchema>(
+ TableFieldSchema().setName("month").setType("INTEGER"),
+ TableFieldSchema().setName("max_mean_temp").setType("FLOAT"))
+
+ val schema = TableSchema().setFields(fields)
+
+ p.apply(BigQueryIO.readTableRows().from(options.input))
+ .apply(MaxMeanTemp())
+ .apply<WriteResult>(
+ BigQueryIO.writeTableRows()
+ .to(options.output)
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+ p.run().waitUntilFinish()
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md
new file mode 100644
index 0000000..a405808
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md
@@ -0,0 +1,71 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# "Cookbook" Examples
+
+This directory holds simple "cookbook" examples, which show how to define
+commonly-used data analysis patterns that you would likely incorporate into a
+larger Apache Beam pipeline. They include:
+
+ <ul>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a>
+ — An example that reads the public samples of weather data from Google
+ BigQuery, counts the number of tornadoes that occur in each month, and
+ writes the results to BigQuery. Demonstrates reading/writing BigQuery,
+ counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a>
+ — An example that reads the public "Shakespeare" data, and for
+ each word in the dataset that exceeds a given length, generates a string
+ containing the list of play names in which that word appears.
+ Demonstrates the <code>Combine.perKey</code>
+ transform, which lets you combine the values in a key-grouped
+ <code>PCollection</code>.
+ </li>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java">DistinctExample</a>
+ — An example that uses Shakespeare's plays as plain text files, and
+ removes duplicate lines across all the files. Demonstrates the
+ <code>Distinct</code>, <code>TextIO.Read</code>,
+ and <code>TextIO.Write</code> transforms, and how to wire transforms together.
+ </li>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java">FilterExamples</a>
+ — An example that shows different approaches to filtering, including
+ selection and projection. It also shows how to dynamically set parameters
+ by defining and using new pipeline options, and use how to use a value derived
+ by a pipeline. Demonstrates the <code>Mean</code> transform,
+ <code>Options</code> configuration, and using pipeline-derived data as a side
+ input.
+ </li>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java">JoinExamples</a>
+ — An example that shows how to join two collections. It uses a
+ sample of the <a href="http://goo.gl/OB6oin">GDELT "world event"
+ data</a>, joining the event <code>action</code> country code against a table
+ that maps country codes to country names. Demonstrates the <code>Join</code>
+ operation, and using multiple input sources.
+ </li>
+ <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a>
+ — An example that reads the public samples of weather data from BigQuery,
+ and finds the maximum temperature (<code>mean_temp</code>) for each month.
+ Demonstrates the <code>Max</code> statistical combination transform, and how to
+ find the max-per-key group.
+ </li>
+ </ul>
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and the [Examples
+README](../../../../../../../../README.md) for
+information about how to run these examples.
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
new file mode 100644
index 0000000..1f9a008
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableReference
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.examples.kotlin.common.ExampleBigQueryTableOptions
+import org.apache.beam.examples.kotlin.common.ExampleOptions
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.StreamingOptions
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.GroupByKey
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.windowing.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PCollectionList
+import org.joda.time.Duration
+import org.joda.time.Instant
+
+import java.util.Random
+import java.util.concurrent.TimeUnit
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from a text file and writes the results to BigQuery. It divides
+ * the data into [windows][Window] to be processed, and demonstrates using various kinds of
+ * [triggers][org.apache.beam.sdk.transforms.windowing.Trigger] to control when the results for
+ * each window are emitted.
+ *
+ *
+ * This example uses a portion of real traffic data from San Diego freeways. It contains readings
+ * from sensor stations set up along each freeway. Each sensor reading includes a calculation of the
+ * 'total flow' across all lanes in that freeway direction.
+ *
+ *
+ * Concepts:
+ *
+ * <pre>
+ * 1. The default triggering behavior
+ * 2. Late data with the default trigger
+ * 3. How to get speculative estimates
+ * 4. Combining late data and speculative estimates
+</pre> *
+ *
+ *
+ * Before running this example, it will be useful to familiarize yourself with Beam triggers and
+ * understand the concept of 'late data', See: [
+ * https://beam.apache.org/documentation/programming-guide/#triggers](https://beam.apache.org/documentation/programming-guide/#triggers)
+ *
+ *
+ * The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Beam pipeline). You can override them by using the `--bigQueryDataset`, and `--bigQueryTable` options. If the BigQuery table do not exist, the
+ * example will try to create them.
+ *
+ *
+ * The pipeline outputs its results to a BigQuery table. Here are some queries you can use to see
+ * interesting results: Replace `<enter_table_name>` in the query below with the name of the
+ * BigQuery table. Replace `<enter_window_interval>` in the query below with the window
+ * interval.
+ *
+ *
+ * To see the results of the default trigger, Note: When you start up your pipeline, you'll
+ * initially see results from 'late' data. Wait after the window duration, until the first pane of
+ * non-late data has been emitted, to see more interesting results. `SELECT * FROM
+ * enter_table_name WHERE trigger_type = "default" ORDER BY window DESC`
+ *
+ *
+ * To see the late data i.e. dropped by the default trigger, `SELECT * FROM
+ * <enter_table_name> WHERE trigger_type = "withAllowedLateness" and (timing = "LATE" or timing =
+ * "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see the the difference between accumulation mode and discarding mode, `SELECT * FROM
+ * <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND (trigger_type =
+ * "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY window DESC,
+ * processing_time`
+ *
+ *
+ * To see speculative results every minute, `SELECT * FROM <enter_table_name> WHERE
+ * trigger_type = "speculative" and freeway = "5" ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see speculative results every five minutes after the end of the window `SELECT * FROM
+ * <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" and freeway = "5"
+ * ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see the first and the last pane for a freeway in a window for all the trigger types, `SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window`
+ *
+ *
+ * To reduce the number of results for each query we can add additional where clauses. For
+ * examples, To see the results of the default trigger, `SELECT * FROM <enter_table_name>
+ * WHERE trigger_type = "default" AND freeway = "5" AND window = "<enter_window_interval>"`
+ *
+ *
+ * The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+object TriggerExample {
+ // Numeric value of fixed window duration, in minutes
+ const val WINDOW_DURATION = 30
+ // Constants used in triggers.
+ // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+ // ONE_MINUTE is used only with processing time before the end of the window
+ val ONE_MINUTE: Duration = Duration.standardMinutes(1)
+ // FIVE_MINUTES is used only with processing time after the end of the window
+ val FIVE_MINUTES: Duration = Duration.standardMinutes(5)
+ // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+ val ONE_DAY: Duration = Duration.standardDays(1)
+
+ /** Defines the BigQuery schema used for the output. */
+ private val schema: TableSchema
+ get() {
+ val fields = arrayListOf<TableFieldSchema>(
+ TableFieldSchema().setName("trigger_type").setType("STRING"),
+ TableFieldSchema().setName("freeway").setType("STRING"),
+ TableFieldSchema().setName("total_flow").setType("INTEGER"),
+ TableFieldSchema().setName("number_of_records").setType("INTEGER"),
+ TableFieldSchema().setName("window").setType("STRING"),
+ TableFieldSchema().setName("isFirst").setType("BOOLEAN"),
+ TableFieldSchema().setName("isLast").setType("BOOLEAN"),
+ TableFieldSchema().setName("timing").setType("STRING"),
+ TableFieldSchema().setName("event_time").setType("TIMESTAMP"),
+ TableFieldSchema().setName("processing_time").setType("TIMESTAMP")
+ )
+ return TableSchema().setFields(fields)
+ }
+
+ /**
+ * This transform demonstrates using triggers to control when data is produced for each window
+ * Consider an example to understand the results generated by each type of trigger. The example
+ * uses "freeway" as the key. Event time is the timestamp associated with the data element and
+ * processing time is the time when the data element gets processed in the pipeline. For freeway
+ * 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. Key (freeway) | Value
+ * (total_flow) | event time | processing time 5 | 50 | 10:00:03 | 10:00:47 5 | 30 | 10:01:00 |
+ * 10:01:03 5 | 30 | 10:02:00 | 11:07:00 5 | 20 | 10:04:10 | 10:05:15 5 | 60 | 10:05:00 | 11:03:00
+ * 5 | 20 | 10:05:01 | 11.07:30 5 | 60 | 10:15:00 | 10:27:15 5 | 40 | 10:26:40 | 10:26:43 5 | 60 |
+ * 10:27:20 | 10:27:25 5 | 60 | 10:29:00 | 11:11:00
+ *
+ *
+ * Beam tracks a watermark which records up to what point in event time the data is complete.
+ * For the purposes of the example, we'll assume the watermark is approximately 15m behind the
+ * current processing time. In practice, the actual value would vary over time based on the
+ * systems knowledge of the current delay and contents of the backlog (data that has not yet been
+ * processed).
+ *
+ *
+ * If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+ * close at 10:44:59, when the watermark passes 10:30:00.
+ */
+ internal class CalculateTotalFlow(private val windowDuration: Int) : PTransform<PCollection<KV<String, Int>>, PCollectionList<TableRow>>() {
+
+ override fun expand(flowInfo: PCollection<KV<String, Int>>): PCollectionList<TableRow> {
+
+ // Concept #1: The default triggering behavior
+ // By default Beam uses a trigger which fires when the watermark has passed the end of the
+ // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+ // The system also defaults to dropping late data -- data which arrives after the watermark
+ // has passed the event timestamp of the arriving element. This means that the default trigger
+ // will only fire once.
+
+ // Each pane produced by the default trigger with no allowed lateness will be the first and
+ // last pane in the window, and will be ON_TIME.
+
+ // The results for the example above with the default trigger and zero allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | true | ON_TIME
+
+ // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+ // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+ // late, and dropped.
+
+ val defaultTriggerResults = flowInfo
+ .apply(
+ "Default",
+ Window
+ // The default window duration values work well if you're running the default
+ // input
+ // file. You may want to adjust the window duration otherwise.
+ .into<KV<String, Int>>(
+ FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+ // The default trigger first emits output when the system's watermark passes
+ // the end
+ // of the window.
+ .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+ // Late data is dropped
+ .withAllowedLateness(Duration.ZERO)
+ // Discard elements after emitting each pane.
+ // With no allowed lateness and the specified trigger there will only be a
+ // single
+ // pane, so this doesn't have a noticeable effect. See concept 2 for more
+ // details.
+ .discardingFiredPanes())
+ .apply(TotalFlow("default"))
+
+ // Concept #2: Late data with the default trigger
+ // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+ // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+ // window. Any late data will result in an additional pane being fired for that same window.
+
+ // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+ // To definitely get the last pane when the window closes, use
+ // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+ // The results for the example above with the default trigger and ONE_DAY allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | false | ON_TIME
+ // 5 | 60 | 1 | false | false | LATE
+ // 5 | 30 | 1 | false | false | LATE
+ // 5 | 20 | 1 | false | false | LATE
+ // 5 | 60 | 1 | false | false | LATE
+ val withAllowedLatenessResults = flowInfo
+ .apply(
+ "WithLateData",
+ Window.into<KV<String, Int>>(
+ FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+ // Late data is emitted as it arrives
+ .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+ // Once the output is produced, the pane is dropped and we start preparing the
+ // next
+ // pane for the window
+ .discardingFiredPanes()
+ // Late data is handled up to one day
+ .withAllowedLateness(ONE_DAY))
+ .apply(TotalFlow("withAllowedLateness"))
+
+ // Concept #3: How to get speculative estimates
+ // We can specify a trigger that fires independent of the watermark, for instance after
+ // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+ // all the data is available. Since we don't have any triggers that depend on the watermark
+ // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+ // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // 5 | 320 | 7 | false | false | LATE
+ // 5 | 370 | 9 | false | false | LATE
+ // 5 | 430 | 10 | false | false | LATE
+ val speculativeResults = flowInfo
+ .apply(
+ "Speculative",
+ Window.into<KV<String, Int>>(
+ FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+ // Trigger fires every minute.
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ // Speculative every ONE_MINUTE
+ .plusDelayOf(ONE_MINUTE)))
+ // After emitting each pane, it will continue accumulating the elements so
+ // that each
+ // approximation includes all of the previous data in addition to the newly
+ // arrived
+ // data.
+ .accumulatingFiredPanes()
+ .withAllowedLateness(ONE_DAY))
+ .apply(TotalFlow("speculative"))
+
+ // Concept #4: Combining late data and speculative estimates
+ // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+ // and LATE updates based on late data.
+
+ // Each time a triggering condition is satisfied it advances to the next trigger.
+ // If there are new elements this trigger emits a window under following condition:
+ // > Early approximations every minute till the end of the window.
+ // > An on-time firing when the watermark has passed the end of the window
+ // > Every five minutes of late data.
+
+ // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // [First pane fired after the end of the window]
+ // 5 | 320 | 7 | false | false | ON_TIME
+ // 5 | 430 | 10 | false | false | LATE
+
+ // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+ val sequentialResults = flowInfo
+ .apply(
+ "Sequential",
+ Window.into<KV<String, Int>>(
+ FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+ .triggering(
+ AfterEach.inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ // Speculative every ONE_MINUTE
+ .plusDelayOf(ONE_MINUTE))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ // Late data every FIVE_MINUTES
+ .plusDelayOf(FIVE_MINUTES))))
+ .accumulatingFiredPanes()
+ // For up to ONE_DAY
+ .withAllowedLateness(ONE_DAY))
+ .apply(TotalFlow("sequential"))
+
+ // Adds the results generated by each trigger type to a PCollectionList.
+
+ return PCollectionList.of<TableRow>(defaultTriggerResults)
+ .and(withAllowedLatenessResults)
+ .and(speculativeResults)
+ .and(sequentialResults)
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The remaining parts of the pipeline are needed to produce the output for each
+ // concept above. Not directly relevant to understanding the trigger examples.
+
+ /**
+ * Calculate total flow and number of records for each freeway and format the results to TableRow
+ * objects, to save to BigQuery.
+ */
+ internal class TotalFlow(private val triggerType: String) : PTransform<PCollection<KV<String, Int>>, PCollection<TableRow>>() {
+
+ override fun expand(flowInfo: PCollection<KV<String, Int>>): PCollection<TableRow> {
+ val flowPerFreeway = flowInfo.apply(GroupByKey.create())
+
+ val results = flowPerFreeway.apply(
+ ParDo.of(
+ object : DoFn<KV<String, Iterable<Int>>, KV<String, String>>() {
+
+ @ProcessElement
+ @Throws(Exception::class)
+ fun processElement(c: ProcessContext) {
+ val flows = c.element().value
+ var sum = 0
+ var numberOfRecords = 0L
+ for (value in flows) {
+ sum += value
+ numberOfRecords++
+ }
+ c.output(KV.of<String, String>(c.element().key, "$sum,$numberOfRecords"))
+ }
+ }))
+ return results.apply(ParDo.of(FormatTotalFlow(triggerType)))
+ }
+ }
+
+ /**
+ * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. Adds the
+ * triggerType, pane information, processing time and the window timestamp.
+ */
+ internal class FormatTotalFlow(private val triggerType: String) : DoFn<KV<String, String>, TableRow>() {
+
+ @ProcessElement
+ @Throws(Exception::class)
+ fun processElement(c: ProcessContext, window: BoundedWindow) {
+ val values = c.element().value.split(",".toRegex()).toTypedArray()
+ val row = TableRow()
+ .set("trigger_type", triggerType)
+ .set("freeway", c.element().key)
+ .set("total_flow", Integer.parseInt(values[0]))
+ .set("number_of_records", java.lang.Long.parseLong(values[1]))
+ .set("window", window.toString())
+ .set("isFirst", c.pane().isFirst)
+ .set("isLast", c.pane().isLast)
+ .set("timing", c.pane().timing.toString())
+ .set("event_time", c.timestamp().toString())
+ .set("processing_time", Instant.now().toString())
+ c.output(row)
+ }
+ }
+
+ /**
+ * Extract the freeway and total flow in a reading. Freeway is used as key since we are
+ * calculating the total flow for each freeway.
+ */
+ internal class ExtractFlowInfo : DoFn<String, KV<String, Int>>() {
+
+ @ProcessElement
+ @Throws(Exception::class)
+ fun processElement(c: ProcessContext) {
+ val laneInfo = c.element().split(",".toRegex()).toTypedArray()
+ if ("timestamp" == laneInfo[0]) {
+ // Header row
+ return
+ }
+ if (laneInfo.size < VALID_NUM_FIELDS) {
+ // Skip the invalid input.
+ return
+ }
+ val freeway = laneInfo[2]
+ val totalFlow = tryIntegerParse(laneInfo[7])
+ // Ignore the records with total flow 0 to easily understand the working of triggers.
+ // Skip the records with total flow -1 since they are invalid input.
+ if (totalFlow == null || totalFlow <= 0) {
+ return
+ }
+ c.output(KV.of<String, Int>(freeway, totalFlow))
+ }
+
+ companion object {
+ private const val VALID_NUM_FIELDS = 50
+ }
+ }
+
+ /** Inherits standard configuration options. */
+ interface TrafficFlowOptions : ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
+
+ @get:Description("Input file to read from")
+ @get:Default.String("gs://apache-beam-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
+ var input: String
+
+ @get:Description("Numeric value of window duration for fixed windows, in minutes")
+ @get:Default.Integer(WINDOW_DURATION)
+ var windowDuration: Int?
+ }
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as TrafficFlowOptions
+ options.isStreaming = true
+
+ options.bigQuerySchema = schema
+
+ val exampleUtils = ExampleUtils(options)
+ exampleUtils.setup()
+
+ val pipeline = Pipeline.create(options)
+
+ val tableRef = getTableReference(
+ options.project, options.bigQueryDataset, options.bigQueryTable)
+
+ val resultList = pipeline
+ .apply("ReadMyFile", TextIO.read().from(options.input))
+ .apply("InsertRandomDelays", ParDo.of(InsertDelays()))
+ .apply(ParDo.of(ExtractFlowInfo()))
+ .apply(CalculateTotalFlow(options.windowDuration!!))
+
+ for (i in 0 until resultList.size()) {
+ resultList.get(i).apply<WriteResult>(BigQueryIO.writeTableRows().to(tableRef).withSchema(schema))
+ }
+
+ val result = pipeline.run()
+
+ // ExampleUtils will try to cancel the pipeline and the injector before the program exits.
+ exampleUtils.waitToFinish(result)
+ }
+
+ /** Add current time to each record. Also insert a delay at random to demo the triggers. */
+ class InsertDelays : DoFn<String, String>() {
+
+ @ProcessElement
+ @Throws(Exception::class)
+ fun processElement(c: ProcessContext) {
+ var timestamp = Instant.now()
+ val random = Random()
+ if (random.nextDouble() < THRESHOLD) {
+ val range = MAX_DELAY - MIN_DELAY
+ val delayInMinutes = random.nextInt(range) + MIN_DELAY
+ val delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes.toLong())
+ timestamp = Instant(timestamp.millis - delayInMillis)
+ }
+ c.outputWithTimestamp(c.element(), timestamp)
+ }
+
+ companion object {
+ private const val THRESHOLD = 0.001
+ // MIN_DELAY and MAX_DELAY in minutes.
+ private const val MIN_DELAY = 1
+ private const val MAX_DELAY = 100
+ }
+ }
+
+ /** Sets the table reference. */
+ private fun getTableReference(project: String, dataset: String, table: String): TableReference {
+ return TableReference().apply {
+ projectId = project
+ datasetId = dataset
+ tableId = table
+ }
+ }
+
+ private fun tryIntegerParse(number: String): Int? {
+ return try {
+ Integer.parseInt(number)
+ } catch (e: NumberFormatException) {
+ null
+ }
+
+ }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
new file mode 100644
index 0000000..6af6ff1
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.snippets
+
+import com.google.api.services.bigquery.model.*
+import com.google.common.collect.ImmutableList
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.coders.AvroCoder
+import org.apache.beam.sdk.coders.DefaultCoder
+import org.apache.beam.sdk.coders.DoubleCoder
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.transforms.join.CoGbkResult
+import org.apache.beam.sdk.transforms.join.CoGroupByKey
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
+import org.apache.beam.sdk.values.*
+
+/** Code snippets used in webdocs. */
+object Snippets {
+
+ @DefaultCoder(AvroCoder::class)
+ internal class Quote(
+ val source: String = "",
+ val quote: String = ""
+ )
+
+ @DefaultCoder(AvroCoder::class)
+ internal class WeatherData(
+ val year: Long = 0,
+ val month: Long = 0,
+ val day: Long = 0,
+ val maxTemp: Double = 0.0
+ )
+
+ @JvmOverloads
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ //Apparently findbugs doesn't like that a non-serialized object i.e. pipeline is being used inside the run{} block
+ fun modelBigQueryIO(
+ pipeline: Pipeline, writeProject: String = "", writeDataset: String = "", writeTable: String = "") {
+ run {
+ // [START BigQueryTableSpec]
+ val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+ // [END BigQueryTableSpec]
+ }
+
+ run {
+ // [START BigQueryTableSpecWithoutProject]
+ val tableSpec = "samples.weather_stations"
+ // [END BigQueryTableSpecWithoutProject]
+ }
+
+ run {
+ // [START BigQueryTableSpecObject]
+ val tableSpec = TableReference()
+ .setProjectId("clouddataflow-readonly")
+ .setDatasetId("samples")
+ .setTableId("weather_stations")
+ // [END BigQueryTableSpecObject]
+ }
+
+ run {
+ val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+ // [START BigQueryReadTable]
+ val maxTemperatures = pipeline.apply(BigQueryIO.readTableRows().from(tableSpec))
+ // Each row is of type TableRow
+ .apply<PCollection<Double>>(
+ MapElements.into(TypeDescriptors.doubles())
+ .via(SerializableFunction<TableRow, Double> {
+ it["max_temperature"] as Double
+ })
+ )
+ // [END BigQueryReadTable]
+ }
+
+ run {
+ val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+ // [START BigQueryReadFunction]
+ val maxTemperatures = pipeline.apply(
+ BigQueryIO.read { it.record["max_temperature"] as Double }
+ .from(tableSpec)
+ .withCoder(DoubleCoder.of()))
+ // [END BigQueryReadFunction]
+ }
+
+ run {
+ // [START BigQueryReadQuery]
+ val maxTemperatures = pipeline.apply(
+ BigQueryIO.read { it.record["max_temperature"] as Double }
+ .fromQuery(
+ "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]")
+ .withCoder(DoubleCoder.of()))
+ // [END BigQueryReadQuery]
+ }
+
+ run {
+ // [START BigQueryReadQueryStdSQL]
+ val maxTemperatures = pipeline.apply(
+ BigQueryIO.read { it.record["max_temperature"] as Double }
+ .fromQuery(
+ "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
+ .usingStandardSql()
+ .withCoder(DoubleCoder.of()))
+ // [END BigQueryReadQueryStdSQL]
+ }
+
+ // [START BigQuerySchemaJson]
+ val tableSchemaJson = (
+ "{"
+ + " \"fields\": ["
+ + " {"
+ + " \"name\": \"source\","
+ + " \"type\": \"STRING\","
+ + " \"mode\": \"NULLABLE\""
+ + " },"
+ + " {"
+ + " \"name\": \"quote\","
+ + " \"type\": \"STRING\","
+ + " \"mode\": \"REQUIRED\""
+ + " }"
+ + " ]"
+ + "}")
+ // [END BigQuerySchemaJson]
+
+ run {
+ var tableSpec = "clouddataflow-readonly:samples.weather_stations"
+ if (writeProject.isNotEmpty() && writeDataset.isNotEmpty() && writeTable.isNotEmpty()) {
+ tableSpec = "$writeProject:$writeDataset.$writeTable"
+ }
+
+ // [START BigQuerySchemaObject]
+ val tableSchema = TableSchema()
+ .setFields(
+ ImmutableList.of(
+ TableFieldSchema()
+ .setName("source")
+ .setType("STRING")
+ .setMode("NULLABLE"),
+ TableFieldSchema()
+ .setName("quote")
+ .setType("STRING")
+ .setMode("REQUIRED")))
+ // [END BigQuerySchemaObject]
+
+ // [START BigQueryWriteInput]
+ /*
+ @DefaultCoder(AvroCoder::class)
+ class Quote(
+ val source: String = "",
+ val quote: String = ""
+ )
+ */
+
+
+ val quotes = pipeline.apply(
+ Create.of(
+ Quote("Mahatma Gandhi", "My life is my message."),
+ Quote("Yoda", "Do, or do not. There is no 'try'.")))
+ // [END BigQueryWriteInput]
+
+ // [START BigQueryWriteTable]
+ quotes
+ .apply<PCollection<TableRow>>(
+ MapElements.into(TypeDescriptor.of(TableRow::class.java))
+ .via(SerializableFunction<Quote, TableRow> { TableRow().set("source", it.source).set("quote", it.quote) }))
+ .apply<WriteResult>(
+ BigQueryIO.writeTableRows()
+ .to(tableSpec)
+ .withSchema(tableSchema)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+ // [END BigQueryWriteTable]
+
+ // [START BigQueryWriteFunction]
+ quotes.apply<WriteResult>(
+ BigQueryIO.write<Quote>()
+ .to(tableSpec)
+ .withSchema(tableSchema)
+ .withFormatFunction { TableRow().set("source", it.source).set("quote", it.quote) }
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+ // [END BigQueryWriteFunction]
+
+ // [START BigQueryWriteJsonSchema]
+ quotes.apply<WriteResult>(
+ BigQueryIO.write<Quote>()
+ .to(tableSpec)
+ .withJsonSchema(tableSchemaJson)
+ .withFormatFunction { TableRow().set("source", it.source).set("quote", it.quote) }
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+ // [END BigQueryWriteJsonSchema]
+ }
+
+ run {
+ // [START BigQueryWriteDynamicDestinations]
+
+ /*
+ @DefaultCoder(AvroCoder::class)
+ class WeatherData(
+ val year: Long = 0,
+ val month: Long = 0,
+ val day: Long = 0,
+ val maxTemp: Double = 0.0
+ )
+ */
+ val weatherData = pipeline.apply(
+ BigQueryIO.read {
+ val record = it.record
+ WeatherData(
+ record.get("year") as Long,
+ record.get("month") as Long,
+ record.get("day") as Long,
+ record.get("max_temperature") as Double)
+ }
+ .fromQuery("""
+ SELECT year, month, day, max_temperature
+ FROM [clouddataflow-readonly:samples.weather_stations]
+ WHERE year BETWEEN 2007 AND 2009
+ """.trimIndent())
+ .withCoder(AvroCoder.of(WeatherData::class.java)))
+
+ // We will send the weather data into different tables for every year.
+ weatherData.apply<WriteResult>(
+ BigQueryIO.write<WeatherData>()
+ .to(
+ object : DynamicDestinations<WeatherData, Long>() {
+ override fun getDestination(elem: ValueInSingleWindow<WeatherData>): Long? {
+ return elem.value!!.year
+ }
+
+ override fun getTable(destination: Long?): TableDestination {
+ return TableDestination(
+ TableReference()
+ .setProjectId(writeProject)
+ .setDatasetId(writeDataset)
+ .setTableId("${writeTable}_$destination"),
+ "Table for year $destination")
+ }
+
+ override fun getSchema(destination: Long?): TableSchema {
+ return TableSchema()
+ .setFields(
+ ImmutableList.of(
+ TableFieldSchema()
+ .setName("year")
+ .setType("INTEGER")
+ .setMode("REQUIRED"),
+ TableFieldSchema()
+ .setName("month")
+ .setType("INTEGER")
+ .setMode("REQUIRED"),
+ TableFieldSchema()
+ .setName("day")
+ .setType("INTEGER")
+ .setMode("REQUIRED"),
+ TableFieldSchema()
+ .setName("maxTemp")
+ .setType("FLOAT")
+ .setMode("NULLABLE")))
+ }
+ })
+ .withFormatFunction {
+ TableRow()
+ .set("year", it.year)
+ .set("month", it.month)
+ .set("day", it.day)
+ .set("maxTemp", it.maxTemp)
+ }
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+ // [END BigQueryWriteDynamicDestinations]
+
+ var tableSpec = "clouddataflow-readonly:samples.weather_stations"
+ if (writeProject.isNotEmpty() && writeDataset.isNotEmpty() && writeTable.isNotEmpty()) {
+ tableSpec = "$writeProject:$writeDataset.${writeTable}_partitioning"
+ }
+
+ val tableSchema = TableSchema()
+ .setFields(
+ ImmutableList.of(
+ TableFieldSchema().setName("year").setType("INTEGER").setMode("REQUIRED"),
+ TableFieldSchema()
+ .setName("month")
+ .setType("INTEGER")
+ .setMode("REQUIRED"),
+ TableFieldSchema().setName("day").setType("INTEGER").setMode("REQUIRED"),
+ TableFieldSchema()
+ .setName("maxTemp")
+ .setType("FLOAT")
+ .setMode("NULLABLE")))
+
+ // [START BigQueryTimePartitioning]
+ weatherData.apply<WriteResult>(
+ BigQueryIO.write<WeatherData>()
+ .to("${tableSpec}_partitioning")
+ .withSchema(tableSchema)
+ .withFormatFunction {
+ TableRow()
+ .set("year", it.year)
+ .set("month", it.month)
+ .set("day", it.day)
+ .set("maxTemp", it.maxTemp)
+ }
+ // NOTE: an existing table without time partitioning set up will not work
+ .withTimePartitioning(TimePartitioning().setType("DAY"))
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+ // [END BigQueryTimePartitioning]
+ }
+ }
+
+ /** Helper function to format results in coGroupByKeyTuple. */
+ fun formatCoGbkResults(
+ name: String?, emails: Iterable<String>, phones: Iterable<String>): String {
+
+ val emailsList = ArrayList<String>()
+ for (elem in emails) {
+ emailsList.add("'$elem'")
+ }
+ emailsList.sort()
+ val emailsStr = "[${emailsList.joinToString(", ")}]"
+
+ val phonesList = ArrayList<String>()
+ for (elem in phones) {
+ phonesList.add("'$elem'")
+ }
+ phonesList.sort()
+ val phonesStr = "[${phonesList.joinToString(", ")}]"
+
+ return "$name; $emailsStr; $phonesStr"
+ }
+
+ /** Using a CoGroupByKey transform. */
+ fun coGroupByKeyTuple(
+ emailsTag: TupleTag<String>,
+ phonesTag: TupleTag<String>,
+ emails: PCollection<KV<String, String>>,
+ phones: PCollection<KV<String, String>>): PCollection<String> {
+
+ // [START CoGroupByKeyTuple]
+ val results = KeyedPCollectionTuple.of(emailsTag, emails)
+ .and(phonesTag, phones)
+ .apply(CoGroupByKey.create())
+
+// [END CoGroupByKeyTuple]
+ return results.apply(
+ ParDo.of(
+ object : DoFn<KV<String, CoGbkResult>, String>() {
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ val e = c.element()
+ val name = e.key
+ val emailsIter = e.value.getAll(emailsTag)
+ val phonesIter = e.value.getAll(phonesTag)
+ val formattedResult = formatCoGbkResults(name, emailsIter, phonesIter)
+ c.output(formattedResult)
+ }
+ }))
+ }
+}
+/** Using a Read and Write transform to read/write from/to BigQuery. */