You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/14 20:34:34 UTC

[beam] branch master updated: [BEAM-6493] : Add cookbook and snippets examples in Kotlin (#8439)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b988307  [BEAM-6493] : Add cookbook and snippets examples in Kotlin (#8439)
b988307 is described below

commit b9883076487334d5abd34e1db42650c095c0b4d3
Author: Harshit Dwivedi <47...@users.noreply.github.com>
AuthorDate: Tue May 14 13:34:20 2019 -0700

    [BEAM-6493] : Add cookbook and snippets examples in Kotlin (#8439)
    
    * feat : convert snippets to kotlin:
    
    Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
    
    * add kotlin samples for cookbook
    
    Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
    
    * fix : findbugs errors
    
    Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
    
    * fix : specify type explicitly for failing apply
    
    Signed-off-by: Harshit Dwivedi <ha...@pitech.app>
---
 examples/kotlin/OWNERS                             |   5 -
 .../examples/kotlin/cookbook/BigQueryTornadoes.kt  | 195 ++++++++
 .../kotlin/cookbook/CombinePerKeyExamples.kt       | 187 ++++++++
 .../examples/kotlin/cookbook/DistinctExample.kt    |  94 ++++
 .../examples/kotlin/cookbook/FilterExamples.kt     | 229 +++++++++
 .../beam/examples/kotlin/cookbook/JoinExamples.kt  | 174 +++++++
 .../examples/kotlin/cookbook/MaxPerKeyExamples.kt  | 155 ++++++
 .../apache/beam/examples/kotlin/cookbook/README.md |  71 +++
 .../examples/kotlin/cookbook/TriggerExample.kt     | 524 +++++++++++++++++++++
 .../beam/examples/kotlin/snippets/Snippets.kt      | 382 +++++++++++++++
 10 files changed, 2011 insertions(+), 5 deletions(-)

diff --git a/examples/kotlin/OWNERS b/examples/kotlin/OWNERS
deleted file mode 100644
index 3811246..0000000
--- a/examples/kotlin/OWNERS
+++ /dev/null
@@ -1,5 +0,0 @@
-# See the OWNERS docs at https://s.apache.org/beam-owners
-
-reviewers:
-  - lukecwik
-  - aaltay
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
new file mode 100644
index 0000000..e4019ba
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, counts the number of
+ * tornadoes that occur in each month, and writes the results to BigQuery.
+ *
+ *
+ * Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output with the form:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations`
+ * and can be overridden with `--input`.
+ */
+object BigQueryTornadoes {
+    // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+    private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+
+    /**
+     * Examines each row in the input table. If a tornado was recorded in that sample, the month in
+     * which it occurred is output.
+     */
+    internal class ExtractTornadoesFn : DoFn<TableRow, Int>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            if (row["tornado"] as Boolean) {
+                c.output(Integer.parseInt(row["month"] as String))
+            }
+        }
+    }
+
+    /**
+     * Prepares the data for writing to BigQuery by building a TableRow object containing an integer
+     * representation of month and the number of tornadoes that occurred in each month.
+     */
+    internal class FormatCountsFn : DoFn<KV<Int, Long>, TableRow>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = TableRow()
+                    .set("month", c.element().getKey())
+                    .set("tornado_count", c.element().getValue())
+            c.output(row)
+        }
+    }
+
+    /**
+     * Takes rows from a table and generates a table of counts.
+     *
+     *
+     * The input schema is described by https://developers.google.com/bigquery/docs/dataset-gsod .
+     * The output contains the total number of tornadoes found in each month in the following schema:
+     *
+     *
+     *  * month: integer
+     *  * tornado_count: integer
+     *
+     */
+    internal class CountTornadoes : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+        override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+            // row... => month...
+            val tornadoes = rows.apply(ParDo.of(ExtractTornadoesFn()))
+
+            // month... => <month,count>...
+            val tornadoCounts = tornadoes.apply(Count.perElement())
+
+            // <month,count>... => row...
+
+            return tornadoCounts.apply(ParDo.of(FormatCountsFn()))
+        }
+    }
+
+    /**
+     * Options supported by [BigQueryTornadoes].
+     *
+     *
+     * Inherits standard configuration options.
+     */
+    interface Options : PipelineOptions {
+        @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+        @get:Default.String(WEATHER_SAMPLES_TABLE)
+        var input: String
+
+        @get:Description("Mode to use when reading from BigQuery")
+        @get:Default.Enum("EXPORT")
+        var readMethod: Method
+
+        @get:Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
+        @get:Validation.Required
+        var output: String
+    }
+
+    private fun runBigQueryTornadoes(options: Options) {
+        val p = Pipeline.create(options)
+
+        // Build the table schema for the output table.
+        val fields = arrayListOf<TableFieldSchema>(
+                TableFieldSchema().setName("month").setType("INTEGER"),
+                TableFieldSchema().setName("tornado_count").setType("INTEGER")
+        )
+
+        val schema = TableSchema().setFields(fields)
+
+        val rowsFromBigQuery: PCollection<TableRow>
+
+        if (options.readMethod == Method.DIRECT_READ) {
+            // Build the read options proto for the read operation.
+            val tableReadOptions = TableReadOptions.newBuilder()
+                    .addAllSelectedFields(Lists.newArrayList("month", "tornado"))
+                    .build()
+
+            rowsFromBigQuery = p.apply(
+                    BigQueryIO.readTableRows()
+                            .from(options.input)
+                            .withMethod(Method.DIRECT_READ)
+                            .withReadOptions(tableReadOptions))
+        } else {
+            rowsFromBigQuery = p.apply(
+                    BigQueryIO.readTableRows()
+                            .from(options.input)
+                            .withMethod(options.readMethod))
+        }
+
+        rowsFromBigQuery
+                .apply(CountTornadoes())
+                .apply<WriteResult>(
+                        BigQueryIO.writeTableRows()
+                                .to(options.output)
+                                .withSchema(schema)
+                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+        p.run().waitUntilFinish()
+    }
+
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+
+        runBigQueryTornadoes(options)
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
new file mode 100644
index 0000000..9e38803
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in the dataset that is
+ * over a given length, generates a string containing the list of play names in which that word
+ * appears, and saves this information to a bigquery table.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `publicdata:samples.shakespeare` and can be
+ * overridden with `--input`.
+ */
+object CombinePerKeyExamples {
+    // Use the shakespeare public BigQuery sample
+    private const val SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"
+    // We'll track words >= this word length across all plays in the table.
+    private const val MIN_WORD_LENGTH = 9
+
+    /**
+     * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+     * outputs word, play_name.
+     */
+    internal class ExtractLargeWordsFn : DoFn<TableRow, KV<String, String>>() {
+        private val smallerWords = Metrics.counter(ExtractLargeWordsFn::class.java, "smallerWords")
+
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val playName = row["corpus"] as String
+            val word = row["word"] as String
+            if (word.length >= MIN_WORD_LENGTH) {
+                c.output(KV.of(word, playName))
+            } else {
+                // Track how many smaller words we're not including. This information will be
+                // visible in the Monitoring UI.
+                smallerWords.inc()
+            }
+        }
+    }
+
+    /**
+     * Prepares the data for writing to BigQuery by building a TableRow object containing a word with
+     * a string listing the plays in which it appeared.
+     */
+    internal class FormatShakespeareOutputFn : DoFn<KV<String, String>, TableRow>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = TableRow().set("word", c.element().key).set("all_plays", c.element().value)
+            c.output(row)
+        }
+    }
+
+    /**
+     * Reads the public 'Shakespeare' data, and for each word in the dataset over a given length,
+     * generates a string containing the list of play names in which that word appears. It does this
+     * via the Combine.perKey transform, with the ConcatWords combine function.
+     *
+     *
+     * Combine.perKey is similar to a GroupByKey followed by a ParDo, but has more restricted
+     * semantics that allow it to be executed more efficiently. These records are then formatted as BQ
+     * table rows.
+     */
+    internal class PlaysForWord : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+        override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+            // row... => <word, play_name> ...
+            val words = rows.apply(ParDo.of(ExtractLargeWordsFn()))
+
+            // word, play_name => word, all_plays ...
+            val wordAllPlays = words.apply<PCollection<KV<String, String>>>(Combine.perKey(ConcatWords()))
+
+            // <word, all_plays>... => row...
+
+            return wordAllPlays.apply(ParDo.of(FormatShakespeareOutputFn()))
+        }
+    }
+
+    /**
+     * A 'combine function' used with the Combine.perKey transform. Builds a comma-separated string of
+     * all input items. So, it will build a string containing all the different Shakespeare plays in
+     * which the given input word has appeared.
+     */
+    class ConcatWords : SerializableFunction<Iterable<String>, String> {
+        override fun apply(input: Iterable<String>): String {
+            val all = StringBuilder()
+            for (item in input) {
+                if (item.isNotEmpty()) {
+                    if (all.isEmpty()) {
+                        all.append(item)
+                    } else {
+                        all.append(",")
+                        all.append(item)
+                    }
+                }
+            }
+            return all.toString()
+        }
+    }
+
+    /**
+     * Options supported by [CombinePerKeyExamples].
+     *
+     *
+     * Inherits standard configuration options.
+     */
+    interface Options : PipelineOptions {
+        @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+        @get:Default.String(SHAKESPEARE_TABLE)
+        var input: String
+
+        @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset_id must already exist")
+        @get:Validation.Required
+        var output: String
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+        val p = Pipeline.create(options)
+
+        // Build the table schema for the output table.
+        val fields = arrayListOf<TableFieldSchema>(
+                TableFieldSchema().setName("word").setType("STRING"),
+                TableFieldSchema().setName("all_plays").setType("STRING")
+        )
+        val schema = TableSchema().setFields(fields)
+
+        p.apply(BigQueryIO.readTableRows().from(options.input))
+                .apply(PlaysForWord())
+                .apply<WriteResult>(
+                        BigQueryIO.writeTableRows()
+                                .to(options.output)
+                                .withSchema(schema)
+                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+        p.run().waitUntilFinish()
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt
new file mode 100644
index 0000000..f3ea070
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.Distinct
+import org.apache.beam.sdk.values.PCollection
+
+/**
+ * This example uses as input Shakespeare's plays as plaintext files, and will remove any duplicate
+ * lines across all the files. (The output does not preserve any input order).
+ *
+ * <p>Concepts: the Distinct transform, and how to wire transforms together. Demonstrates {@link
+ * org.apache.beam.sdk.io.TextIO.Read}/ {@link Distinct}/{@link
+ * org.apache.beam.sdk.io.TextIO.Write}.
+ *
+ * <p>To execute this pipeline locally, specify a local output file or output prefix on GCS:
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ *
+ *  <p>To change the runner, specify:
+ *
+ * <pre>{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }</pre>
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ * <p>The input defaults to {@code gs://apache-beam-samples/shakespeare/\*} and can be overridden
+ * with {@code --input}.
+ *
+ */
+
+object DistinctExample {
+
+    /**
+     * Options supported by [DistinctExample].
+     *
+     *
+     * Inherits standard configuration options.
+     */
+
+    interface Options : PipelineOptions {
+        @get:Description("Path to the directory or GCS prefix containing files to read from")
+        @get:Default.String("gs://apache-beam-samples/shakespeare/*")
+        var input: String
+
+        @get:Description("Path of the file to write to")
+        @get:Default.InstanceFactory(OutputFactory::class)
+        var output: String
+
+        /** Returns gs://${TEMP_LOCATION}/"deduped.txt".  */
+        class OutputFactory : DefaultValueFactory<String> {
+            override fun create(options: PipelineOptions): String {
+                options.tempLocation?.let {
+                    return GcsPath.fromUri(it).resolve("deduped.txt").toString()
+                } ?: run {
+                    throw IllegalArgumentException("Must specify --output or --tempLocation")
+                }
+            }
+        }
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+        val p = Pipeline.create(options)
+
+        p.apply<PCollection<String>>("ReadLines", TextIO.read().from(options.input))
+                .apply(Distinct.create<String>())
+                .apply("DedupedShakespeare", TextIO.write().to(options.output))
+
+        p.run().waitUntilFinish()
+    }
+}
\ No newline at end of file
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
new file mode 100644
index 0000000..7084726
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PCollectionView
+
+import java.util.ArrayList
+import java.util.logging.Logger
+
+/**
+ * This is an example that demonstrates several approaches to filtering, and use of the Mean
+ * transform. It shows how to dynamically set parameters by defining and using new pipeline options,
+ * and how to use a value derived by the pipeline.
+ *
+ *
+ * Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
+ * input; approaches to filtering, selection, and projection.
+ *
+ *
+ * The example reads public samples of weather data from BigQuery. It performs a projection on
+ * the data, finds the global mean of the temperature readings, filters on readings for a single
+ * given month, and then outputs only data (for that month) that has a mean temp smaller than the
+ * derived global mean.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * [--monthFilter=<month_number>]
+`</pre> *
+ *
+ * where optional parameter `--monthFilter` is set to a number 1-12.
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations`
+ * and can be overridden with `--input`.
+ */
+object FilterExamples {
+    // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+    private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+    internal val LOG = Logger.getLogger(FilterExamples::class.java.name)
+    internal const val MONTH_TO_FILTER = 7
+
+    /**
+     * Examines each row in the input table. Outputs only the subset of the cells this example is
+     * interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
+     */
+    internal class ProjectionFn : DoFn<TableRow, TableRow>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            // Grab year, month, day, mean_temp from the row
+            val year = Integer.parseInt(row["year"] as String)
+            val month = Integer.parseInt(row["month"] as String)
+            val day = Integer.parseInt(row["day"] as String)
+            val meanTemp = row["mean_temp"].toString().toDouble()
+
+            // Prepares the data for writing to BigQuery by building a TableRow object
+            val outRow = TableRow()
+                    .set("year", year)
+                    .set("month", month)
+                    .set("day", day)
+                    .set("mean_temp", meanTemp)
+            c.output(outRow)
+        }
+    }
+
+    /**
+     * Implements 'filter' functionality.
+     *
+     *
+     * Examines each row in the input table. Outputs only rows from the month monthFilter, which is
+     * passed in as a parameter during construction of this DoFn.
+     */
+    internal class FilterSingleMonthDataFn(var monthFilter: Int?) : DoFn<TableRow, TableRow>() {
+
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val month = row["month"]
+            if (month == this.monthFilter) {
+                c.output(row)
+            }
+        }
+    }
+
+    /**
+     * Examines each row (weather reading) in the input table. Output the temperature reading for that
+     * row ('mean_temp').
+     */
+    internal class ExtractTempFn : DoFn<TableRow, Double>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
+            c.output(meanTemp)
+        }
+    }
+
+    /**
+     * Finds the global mean of the mean_temp for each day/record, and outputs only data that has a
+     * mean temp larger than this global mean.
+     */
+    internal class BelowGlobalMean(var monthFilter: Int?) : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+
+        override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+            // Extract the mean_temp from each row.
+            val meanTemps = rows.apply(ParDo.of(ExtractTempFn()))
+
+            // Find the global mean, of all the mean_temp readings in the weather data,
+            // and prepare this singleton PCollectionView for use as a side input.
+            val globalMeanTemp = meanTemps.apply(Mean.globally()).apply(View.asSingleton())
+
+            // Rows filtered to remove all but a single month
+            val monthFilteredRows = rows.apply(ParDo.of(FilterSingleMonthDataFn(monthFilter)))
+
+            // Then, use the global mean as a side input, to further filter the weather data.
+            // By using a side input to pass in the filtering criteria, we can use a value
+            // that is computed earlier in pipeline execution.
+            // We'll only output readings with temperatures below this mean.
+
+            return monthFilteredRows.apply(
+                    "ParseAndFilter",
+                    ParDo.of(
+                            object : DoFn<TableRow, TableRow>() {
+                                @ProcessElement
+                                fun processElement(c: ProcessContext) {
+                                    val meanTemp = java.lang.Double.parseDouble(c.element()["mean_temp"].toString())
+                                    val gTemp = c.sideInput(globalMeanTemp)
+                                    if (meanTemp < gTemp) {
+                                        c.output(c.element())
+                                    }
+                                }
+                            })
+                            .withSideInputs(globalMeanTemp))
+        }
+    }
+
+    /**
+     * Options supported by [FilterExamples].
+     *
+     *
+     * Inherits standard configuration options.
+     */
+    interface Options : PipelineOptions {
+        @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+        @get:Default.String(WEATHER_SAMPLES_TABLE)
+        var input: String
+
+        @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset_id must already exist")
+        @get:Validation.Required
+        var output: String
+
+        @get:Description("Numeric value of month to filter on")
+        @get:Default.Integer(MONTH_TO_FILTER)
+        var monthFilter: Int?
+    }
+
+    /** Helper method to build the table schema for the output table.  */
+    private fun buildWeatherSchemaProjection(): TableSchema {
+        val fields = arrayListOf<TableFieldSchema>(
+                TableFieldSchema().setName("year").setType("INTEGER"),
+                TableFieldSchema().setName("month").setType("INTEGER"),
+                TableFieldSchema().setName("day").setType("INTEGER"),
+                TableFieldSchema().setName("mean_temp").setType("FLOAT")
+        )
+        return TableSchema().setFields(fields)
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+        val p = Pipeline.create(options)
+
+        val schema = buildWeatherSchemaProjection()
+
+        p.apply(BigQueryIO.readTableRows().from(options.input))
+                .apply(ParDo.of(ProjectionFn()))
+                .apply(BelowGlobalMean(options.monthFilter))
+                .apply<WriteResult>(
+                        BigQueryIO.writeTableRows()
+                                .to(options.output)
+                                .withSchema(schema)
+                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+        p.run().waitUntilFinish()
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
new file mode 100644
index 0000000..3b7f3c4
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableRow
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.Validation
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.join.CoGbkResult
+import org.apache.beam.sdk.transforms.join.CoGroupByKey
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+import org.apache.beam.sdk.values.TupleTag
+
+/**
+ * This example shows how to do a join on two collections. It uses a sample of the GDELT 'world
+ * event' data (http://goo.gl/OB6oin), joining the event 'action' country code against a table that
+ * maps country codes to country names.
+ *
+ *
+ * Concepts: Join operation; multiple input sources.
+ *
+ *
+ * To execute this pipeline locally, specify a local output file or output prefix on GCS:
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure different runners.
+ */
+object JoinExamples {
+
+    // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+    private const val GDELT_EVENTS_TABLE = "clouddataflow-readonly:samples.gdelt_sample"
+    // A table that maps country codes to country names.
+    private const val COUNTRY_CODES = "gdelt-bq:full.crosswalk_geocountrycodetohuman"
+
+    /** Join two collections, using country code as the key.  */
+    @Throws(Exception::class)
+    internal fun joinEvents(
+            eventsTable: PCollection<TableRow>, countryCodes: PCollection<TableRow>): PCollection<String> {
+
+        val eventInfoTag = TupleTag<String>()
+        val countryInfoTag = TupleTag<String>()
+
+        // transform both input collections to tuple collections, where the keys are country
+        // codes in both cases.
+        val eventInfo = eventsTable.apply(ParDo.of(ExtractEventDataFn()))
+        val countryInfo = countryCodes.apply(ParDo.of(ExtractCountryInfoFn()))
+
+        // country code 'key' -> CGBKR (<event info>, <country name>)
+        val kvpCollection = KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
+                .and(countryInfoTag, countryInfo)
+                .apply(CoGroupByKey.create())
+
+        // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+        // country code 'key' -> string of <event info>, <country name>
+        val finalResultCollection = kvpCollection.apply(
+                "Process",
+                ParDo.of(
+                        object : DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+                            @ProcessElement
+                            fun processElement(c: ProcessContext) {
+                                val e = c.element()
+                                val countryCode = e.key
+                                val countryName = e.value.getOnly(countryInfoTag)
+                                for (ei in c.element().value.getAll(eventInfoTag)) {
+                                    // Generate a string that combines information from both collection values
+                                    c.output(
+                                            KV.of<String, String>(
+                                                    countryCode,
+                                                    "Country name: $countryName, Event info: $ei"))
+                                }
+                            }
+                        }))
+
+        // write to GCS
+        return finalResultCollection.apply(
+                "Format",
+                ParDo.of(
+                        object : DoFn<KV<String, String>, String>() {
+                            @ProcessElement
+                            fun processElement(c: ProcessContext) {
+                                val outputString = "Country code: ${c.element().key}, ${c.element().value}"
+                                c.output(outputString)
+                            }
+                        }))
+    }
+
+    /**
+     * Examines each row (event) in the input table. Output a KV with the key the country code of the
+     * event, and the value a string encoding event information.
+     */
+    internal class ExtractEventDataFn : DoFn<TableRow, KV<String, String>>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val countryCode = row["ActionGeo_CountryCode"] as String
+            val sqlDate = row["SQLDATE"] as String
+            val actor1Name = row["Actor1Name"] as String
+            val sourceUrl = row["SOURCEURL"] as String
+            val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl"
+            c.output(KV.of(countryCode, eventInfo))
+        }
+    }
+
+    /**
+     * Examines each row (country info) in the input table. Output a KV with the key the country code,
+     * and the value the country name.
+     */
+    internal class ExtractCountryInfoFn : DoFn<TableRow, KV<String, String>>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val countryCode = row["FIPSCC"] as String
+            val countryName = row["HumanName"] as String
+            c.output(KV.of(countryCode, countryName))
+        }
+    }
+
+    /**
+     * Options supported by [JoinExamples].
+     *
+     * Inherits standard configuration options.
+     */
+    interface Options : PipelineOptions {
+        @get:Description("Path of the file to write to")
+        @get:Validation.Required
+        var output: String
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+        val p = Pipeline.create(options)
+        // the following two 'apply' create multiple inputs to our pipeline, one for each
+        // of our two input sources.
+        val eventsTable = p.apply(BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE))
+        val countryCodes = p.apply(BigQueryIO.readTableRows().from(COUNTRY_CODES))
+        val formattedResults = joinEvents(eventsTable, countryCodes)
+        formattedResults.apply<PDone>(TextIO.write().to(options.output))
+        p.run().waitUntilFinish()
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
new file mode 100644
index 0000000..74d392d
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.examples.kotlin.cookbook.MaxPerKeyExamples.FormatMaxesFn
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.Max
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+
+import java.util.ArrayList
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, and finds the maximum
+ * temperature ('mean_temp') for each month.
+ *
+ *
+ * Concepts: The 'Max' statistical combination function, and how to find the max per key group.
+ *
+ *
+ * Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ *
+ * To execute this pipeline locally, specify the BigQuery table for the output with the form:
+ *
+ * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/java/README.md for instructions about how to configure different runners.
+ *
+ *
+ * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations `
+ * and can be overridden with `--input`.
+ */
+object MaxPerKeyExamples {
+    // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+    private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations"
+
+    /**
+     * Examines each row (weather reading) in the input table. Output the month of the reading, and
+     * the mean_temp.
+     */
+    internal class ExtractTempFn : DoFn<TableRow, KV<Int, Double>>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
+            val month = Integer.parseInt(row["month"] as String)
+            val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
+            c.output(KV.of(month, meanTemp))
+        }
+    }
+
+    /** Format the results to a TableRow, to save to BigQuery.  */
+    internal class FormatMaxesFn : DoFn<KV<Int, Double>, TableRow>() {
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            val row = TableRow()
+                    .set("month", c.element().key)
+                    .set("max_mean_temp", c.element().value)
+            c.output(row)
+        }
+    }
+
+    /**
+     * Reads rows from a weather data table, and finds the max mean_temp for each month via the 'Max'
+     * statistical combination function.
+     */
+    internal class MaxMeanTemp : PTransform<PCollection<TableRow>, PCollection<TableRow>>() {
+        override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> {
+
+            // row... => <month, mean_temp> ...
+            val temps = rows.apply(ParDo.of(ExtractTempFn()))
+
+            // month, mean_temp... => <month, max mean temp>...
+            val tempMaxes = temps.apply(Max.doublesPerKey())
+
+            // <month, max>... => row...
+
+            return tempMaxes.apply(ParDo.of(FormatMaxesFn()))
+        }
+    }
+
+    /**
+     * Options supported by [MaxPerKeyExamples].
+     *
+     *
+     * Inherits standard configuration options.
+     */
+    interface Options : PipelineOptions {
+        @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+        @get:Default.String(WEATHER_SAMPLES_TABLE)
+        var input: String
+
+        @get:Description("Table to write to, specified as <project_id>:<dataset_id>.<table_id>")
+        @get:Validation.Required
+        var output: String
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
+        val p = Pipeline.create(options)
+
+        // Build the table schema for the output table.
+        val fields = arrayListOf<TableFieldSchema>(
+                TableFieldSchema().setName("month").setType("INTEGER"),
+                TableFieldSchema().setName("max_mean_temp").setType("FLOAT"))
+
+        val schema = TableSchema().setFields(fields)
+
+        p.apply(BigQueryIO.readTableRows().from(options.input))
+                .apply(MaxMeanTemp())
+                .apply<WriteResult>(
+                        BigQueryIO.writeTableRows()
+                                .to(options.output)
+                                .withSchema(schema)
+                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))
+
+        p.run().waitUntilFinish()
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md
new file mode 100644
index 0000000..a405808
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/README.md
@@ -0,0 +1,71 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# "Cookbook" Examples
+
+This directory holds simple "cookbook" examples, which show how to define
+commonly-used data analysis patterns that you would likely incorporate into a
+larger Apache Beam pipeline. They include:
+
+ <ul>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a>
+  &mdash; An example that reads the public samples of weather data from Google
+  BigQuery, counts the number of tornadoes that occur in each month, and
+  writes the results to BigQuery. Demonstrates reading/writing BigQuery,
+  counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a>
+  &mdash; An example that reads the public &quot;Shakespeare&quot; data, and for
+  each word in the dataset that exceeds a given length, generates a string
+  containing the list of play names in which that word appears.
+  Demonstrates the <code>Combine.perKey</code>
+  transform, which lets you combine the values in a key-grouped
+  <code>PCollection</code>.
+  </li>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java">DistinctExample</a>
+  &mdash; An example that uses Shakespeare's plays as plain text files, and
+  removes duplicate lines across all the files. Demonstrates the
+  <code>Distinct</code>, <code>TextIO.Read</code>,
+  and <code>TextIO.Write</code> transforms, and how to wire transforms together.
+  </li>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java">FilterExamples</a>
+  &mdash; An example that shows different approaches to filtering, including
+  selection and projection. It also shows how to dynamically set parameters
+  by defining and using new pipeline options, and use how to use a value derived
+  by a pipeline. Demonstrates the <code>Mean</code> transform,
+  <code>Options</code> configuration, and using pipeline-derived data as a side
+  input.
+  </li>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java">JoinExamples</a>
+  &mdash; An example that shows how to join two collections. It uses a
+  sample of the <a href="http://goo.gl/OB6oin">GDELT &quot;world event&quot;
+  data</a>, joining the event <code>action</code> country code against a table
+  that maps country codes to country names. Demonstrates the <code>Join</code>
+  operation, and using multiple input sources.
+  </li>
+  <li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a>
+  &mdash; An example that reads the public samples of weather data from BigQuery,
+  and finds the maximum temperature (<code>mean_temp</code>) for each month.
+  Demonstrates the <code>Max</code> statistical combination transform, and how to
+  find the max-per-key group.
+  </li>
+  </ul>
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and the [Examples
+README](../../../../../../../../README.md) for
+information about how to run these examples.
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
new file mode 100644
index 0000000..1f9a008
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.cookbook
+
+import com.google.api.services.bigquery.model.TableFieldSchema
+import com.google.api.services.bigquery.model.TableReference
+import com.google.api.services.bigquery.model.TableRow
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.examples.kotlin.common.ExampleBigQueryTableOptions
+import org.apache.beam.examples.kotlin.common.ExampleOptions
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.StreamingOptions
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.GroupByKey
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.windowing.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PCollectionList
+import org.joda.time.Duration
+import org.joda.time.Instant
+
+import java.util.Random
+import java.util.concurrent.TimeUnit
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from a text file and writes the results to BigQuery. It divides
+ * the data into [windows][Window] to be processed, and demonstrates using various kinds of
+ * [triggers][org.apache.beam.sdk.transforms.windowing.Trigger] to control when the results for
+ * each window are emitted.
+ *
+ *
+ * This example uses a portion of real traffic data from San Diego freeways. It contains readings
+ * from sensor stations set up along each freeway. Each sensor reading includes a calculation of the
+ * 'total flow' across all lanes in that freeway direction.
+ *
+ *
+ * Concepts:
+ *
+ * <pre>
+ * 1. The default triggering behavior
+ * 2. Late data with the default trigger
+ * 3. How to get speculative estimates
+ * 4. Combining late data and speculative estimates
+</pre> *
+ *
+ *
+ * Before running this example, it will be useful to familiarize yourself with Beam triggers and
+ * understand the concept of 'late data', See: [
+ * https://beam.apache.org/documentation/programming-guide/#triggers](https://beam.apache.org/documentation/programming-guide/#triggers)
+ *
+ *
+ * The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Beam pipeline). You can override them by using the `--bigQueryDataset`, and `--bigQueryTable` options. If the BigQuery table do not exist, the
+ * example will try to create them.
+ *
+ *
+ * The pipeline outputs its results to a BigQuery table. Here are some queries you can use to see
+ * interesting results: Replace `<enter_table_name>` in the query below with the name of the
+ * BigQuery table. Replace `<enter_window_interval>` in the query below with the window
+ * interval.
+ *
+ *
+ * To see the results of the default trigger, Note: When you start up your pipeline, you'll
+ * initially see results from 'late' data. Wait after the window duration, until the first pane of
+ * non-late data has been emitted, to see more interesting results. `SELECT * FROM
+ * enter_table_name WHERE trigger_type = "default" ORDER BY window DESC`
+ *
+ *
+ * To see the late data i.e. dropped by the default trigger, `SELECT * FROM
+ * <enter_table_name> WHERE trigger_type = "withAllowedLateness" and (timing = "LATE" or timing =
+ * "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see the the difference between accumulation mode and discarding mode, `SELECT * FROM
+ * <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND (trigger_type =
+ * "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY window DESC,
+ * processing_time`
+ *
+ *
+ * To see speculative results every minute, `SELECT * FROM <enter_table_name> WHERE
+ * trigger_type = "speculative" and freeway = "5" ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see speculative results every five minutes after the end of the window `SELECT * FROM
+ * <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" and freeway = "5"
+ * ORDER BY window DESC, processing_time`
+ *
+ *
+ * To see the first and the last pane for a freeway in a window for all the trigger types, `SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window`
+ *
+ *
+ * To reduce the number of results for each query we can add additional where clauses. For
+ * examples, To see the results of the default trigger, `SELECT * FROM <enter_table_name>
+ * WHERE trigger_type = "default" AND freeway = "5" AND window = "<enter_window_interval>"`
+ *
+ *
+ * The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+object TriggerExample {
+    // Numeric value of fixed window duration, in minutes
+    const val WINDOW_DURATION = 30
+    // Constants used in triggers.
+    // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+    // ONE_MINUTE is used only with processing time before the end of the window
+    val ONE_MINUTE: Duration = Duration.standardMinutes(1)
+    // FIVE_MINUTES is used only with processing time after the end of the window
+    val FIVE_MINUTES: Duration = Duration.standardMinutes(5)
+    // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+    val ONE_DAY: Duration = Duration.standardDays(1)
+
+    /** Defines the BigQuery schema used for the output.  */
+    private val schema: TableSchema
+        get() {
+            val fields = arrayListOf<TableFieldSchema>(
+                    TableFieldSchema().setName("trigger_type").setType("STRING"),
+                    TableFieldSchema().setName("freeway").setType("STRING"),
+                    TableFieldSchema().setName("total_flow").setType("INTEGER"),
+                    TableFieldSchema().setName("number_of_records").setType("INTEGER"),
+                    TableFieldSchema().setName("window").setType("STRING"),
+                    TableFieldSchema().setName("isFirst").setType("BOOLEAN"),
+                    TableFieldSchema().setName("isLast").setType("BOOLEAN"),
+                    TableFieldSchema().setName("timing").setType("STRING"),
+                    TableFieldSchema().setName("event_time").setType("TIMESTAMP"),
+                    TableFieldSchema().setName("processing_time").setType("TIMESTAMP")
+            )
+            return TableSchema().setFields(fields)
+        }
+
+    /**
+     * This transform demonstrates using triggers to control when data is produced for each window
+     * Consider an example to understand the results generated by each type of trigger. The example
+     * uses "freeway" as the key. Event time is the timestamp associated with the data element and
+     * processing time is the time when the data element gets processed in the pipeline. For freeway
+     * 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. Key (freeway) | Value
+     * (total_flow) | event time | processing time 5 | 50 | 10:00:03 | 10:00:47 5 | 30 | 10:01:00 |
+     * 10:01:03 5 | 30 | 10:02:00 | 11:07:00 5 | 20 | 10:04:10 | 10:05:15 5 | 60 | 10:05:00 | 11:03:00
+     * 5 | 20 | 10:05:01 | 11.07:30 5 | 60 | 10:15:00 | 10:27:15 5 | 40 | 10:26:40 | 10:26:43 5 | 60 |
+     * 10:27:20 | 10:27:25 5 | 60 | 10:29:00 | 11:11:00
+     *
+     *
+     * Beam tracks a watermark which records up to what point in event time the data is complete.
+     * For the purposes of the example, we'll assume the watermark is approximately 15m behind the
+     * current processing time. In practice, the actual value would vary over time based on the
+     * systems knowledge of the current delay and contents of the backlog (data that has not yet been
+     * processed).
+     *
+     *
+     * If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+     * close at 10:44:59, when the watermark passes 10:30:00.
+     */
+    internal class CalculateTotalFlow(private val windowDuration: Int) : PTransform<PCollection<KV<String, Int>>, PCollectionList<TableRow>>() {
+
+        override fun expand(flowInfo: PCollection<KV<String, Int>>): PCollectionList<TableRow> {
+
+            // Concept #1: The default triggering behavior
+            // By default Beam uses a trigger which fires when the watermark has passed the end of the
+            // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+            // The system also defaults to dropping late data -- data which arrives after the watermark
+            // has passed the event timestamp of the arriving element. This means that the default trigger
+            // will only fire once.
+
+            // Each pane produced by the default trigger with no allowed lateness will be the first and
+            // last pane in the window, and will be ON_TIME.
+
+            // The results for the example above with the default trigger and zero allowed lateness
+            // would be:
+            // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+            // 5             | 260                | 6                 | true    | true   | ON_TIME
+
+            // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+            // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+            // late, and dropped.
+
+            val defaultTriggerResults = flowInfo
+                    .apply(
+                            "Default",
+                            Window
+                                    // The default window duration values work well if you're running the default
+                                    // input
+                                    // file. You may want to adjust the window duration otherwise.
+                                    .into<KV<String, Int>>(
+                                            FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+                                    // The default trigger first emits output when the system's watermark passes
+                                    // the end
+                                    // of the window.
+                                    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+                                    // Late data is dropped
+                                    .withAllowedLateness(Duration.ZERO)
+                                    // Discard elements after emitting each pane.
+                                    // With no allowed lateness and the specified trigger there will only be a
+                                    // single
+                                    // pane, so this doesn't have a noticeable effect. See concept 2 for more
+                                    // details.
+                                    .discardingFiredPanes())
+                    .apply(TotalFlow("default"))
+
+            // Concept #2: Late data with the default trigger
+            // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+            // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+            // window. Any late data will result in an additional pane being fired for that same window.
+
+            // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+            // To definitely get the last pane when the window closes, use
+            // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+            // The results for the example above with the default trigger and ONE_DAY allowed lateness
+            // would be:
+            // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+            // 5             | 260                | 6                 | true    | false  | ON_TIME
+            // 5             | 60                 | 1                 | false   | false  | LATE
+            // 5             | 30                 | 1                 | false   | false  | LATE
+            // 5             | 20                 | 1                 | false   | false  | LATE
+            // 5             | 60                 | 1                 | false   | false  | LATE
+            val withAllowedLatenessResults = flowInfo
+                    .apply(
+                            "WithLateData",
+                            Window.into<KV<String, Int>>(
+                                    FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+                                    // Late data is emitted as it arrives
+                                    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+                                    // Once the output is produced, the pane is dropped and we start preparing the
+                                    // next
+                                    // pane for the window
+                                    .discardingFiredPanes()
+                                    // Late data is handled up to one day
+                                    .withAllowedLateness(ONE_DAY))
+                    .apply(TotalFlow("withAllowedLateness"))
+
+            // Concept #3: How to get speculative estimates
+            // We can specify a trigger that fires independent of the watermark, for instance after
+            // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+            // all the data is available. Since we don't have any triggers that depend on the watermark
+            // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+            // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+            // The results for the example above for this trigger would be:
+            // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+            // 5             | 80                 | 2                 | true    | false  | EARLY
+            // 5             | 100                | 3                 | false   | false  | EARLY
+            // 5             | 260                | 6                 | false   | false  | EARLY
+            // 5             | 320                | 7                 | false   | false  | LATE
+            // 5             | 370                | 9                 | false   | false  | LATE
+            // 5             | 430                | 10                | false   | false  | LATE
+            val speculativeResults = flowInfo
+                    .apply(
+                            "Speculative",
+                            Window.into<KV<String, Int>>(
+                                    FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+                                    // Trigger fires every minute.
+                                    .triggering(
+                                            Repeatedly.forever(
+                                                    AfterProcessingTime.pastFirstElementInPane()
+                                                            // Speculative every ONE_MINUTE
+                                                            .plusDelayOf(ONE_MINUTE)))
+                                    // After emitting each pane, it will continue accumulating the elements so
+                                    // that each
+                                    // approximation includes all of the previous data in addition to the newly
+                                    // arrived
+                                    // data.
+                                    .accumulatingFiredPanes()
+                                    .withAllowedLateness(ONE_DAY))
+                    .apply(TotalFlow("speculative"))
+
+            // Concept #4: Combining late data and speculative estimates
+            // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+            // and LATE updates based on late data.
+
+            // Each time a triggering condition is satisfied it advances to the next trigger.
+            // If there are new elements this trigger emits a window under following condition:
+            // > Early approximations every minute till the end of the window.
+            // > An on-time firing when the watermark has passed the end of the window
+            // > Every five minutes of late data.
+
+            // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+            // The results for the example above for this trigger would be:
+            // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+            // 5             | 80                 | 2                 | true    | false  | EARLY
+            // 5             | 100                | 3                 | false   | false  | EARLY
+            // 5             | 260                | 6                 | false   | false  | EARLY
+            // [First pane fired after the end of the window]
+            // 5             | 320                | 7                 | false   | false  | ON_TIME
+            // 5             | 430                | 10                | false   | false  | LATE
+
+            // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+            val sequentialResults = flowInfo
+                    .apply(
+                            "Sequential",
+                            Window.into<KV<String, Int>>(
+                                    FixedWindows.of(Duration.standardMinutes(windowDuration.toLong())))
+                                    .triggering(
+                                            AfterEach.inOrder(
+                                                    Repeatedly.forever(
+                                                            AfterProcessingTime.pastFirstElementInPane()
+                                                                    // Speculative every ONE_MINUTE
+                                                                    .plusDelayOf(ONE_MINUTE))
+                                                            .orFinally(AfterWatermark.pastEndOfWindow()),
+                                                    Repeatedly.forever(
+                                                            AfterProcessingTime.pastFirstElementInPane()
+                                                                    // Late data every FIVE_MINUTES
+                                                                    .plusDelayOf(FIVE_MINUTES))))
+                                    .accumulatingFiredPanes()
+                                    // For up to ONE_DAY
+                                    .withAllowedLateness(ONE_DAY))
+                    .apply(TotalFlow("sequential"))
+
+            // Adds the results generated by each trigger type to a PCollectionList.
+
+            return PCollectionList.of<TableRow>(defaultTriggerResults)
+                    .and(withAllowedLatenessResults)
+                    .and(speculativeResults)
+                    .and(sequentialResults)
+        }
+    }
+
+    //////////////////////////////////////////////////////////////////////////////////////////////////
+    // The remaining parts of the pipeline are needed to produce the output for each
+    // concept above. Not directly relevant to understanding the trigger examples.
+
+    /**
+     * Calculate total flow and number of records for each freeway and format the results to TableRow
+     * objects, to save to BigQuery.
+     */
+    internal class TotalFlow(private val triggerType: String) : PTransform<PCollection<KV<String, Int>>, PCollection<TableRow>>() {
+
+        override fun expand(flowInfo: PCollection<KV<String, Int>>): PCollection<TableRow> {
+            val flowPerFreeway = flowInfo.apply(GroupByKey.create())
+
+            val results = flowPerFreeway.apply(
+                    ParDo.of(
+                            object : DoFn<KV<String, Iterable<Int>>, KV<String, String>>() {
+
+                                @ProcessElement
+                                @Throws(Exception::class)
+                                fun processElement(c: ProcessContext) {
+                                    val flows = c.element().value
+                                    var sum = 0
+                                    var numberOfRecords = 0L
+                                    for (value in flows) {
+                                        sum += value
+                                        numberOfRecords++
+                                    }
+                                    c.output(KV.of<String, String>(c.element().key, "$sum,$numberOfRecords"))
+                                }
+                            }))
+            return results.apply(ParDo.of(FormatTotalFlow(triggerType)))
+        }
+    }
+
+    /**
+     * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. Adds the
+     * triggerType, pane information, processing time and the window timestamp.
+     */
+    internal class FormatTotalFlow(private val triggerType: String) : DoFn<KV<String, String>, TableRow>() {
+
+        @ProcessElement
+        @Throws(Exception::class)
+        fun processElement(c: ProcessContext, window: BoundedWindow) {
+            val values = c.element().value.split(",".toRegex()).toTypedArray()
+            val row = TableRow()
+                    .set("trigger_type", triggerType)
+                    .set("freeway", c.element().key)
+                    .set("total_flow", Integer.parseInt(values[0]))
+                    .set("number_of_records", java.lang.Long.parseLong(values[1]))
+                    .set("window", window.toString())
+                    .set("isFirst", c.pane().isFirst)
+                    .set("isLast", c.pane().isLast)
+                    .set("timing", c.pane().timing.toString())
+                    .set("event_time", c.timestamp().toString())
+                    .set("processing_time", Instant.now().toString())
+            c.output(row)
+        }
+    }
+
+    /**
+     * Extract the freeway and total flow in a reading. Freeway is used as key since we are
+     * calculating the total flow for each freeway.
+     */
+    internal class ExtractFlowInfo : DoFn<String, KV<String, Int>>() {
+
+        @ProcessElement
+        @Throws(Exception::class)
+        fun processElement(c: ProcessContext) {
+            val laneInfo = c.element().split(",".toRegex()).toTypedArray()
+            if ("timestamp" == laneInfo[0]) {
+                // Header row
+                return
+            }
+            if (laneInfo.size < VALID_NUM_FIELDS) {
+                // Skip the invalid input.
+                return
+            }
+            val freeway = laneInfo[2]
+            val totalFlow = tryIntegerParse(laneInfo[7])
+            // Ignore the records with total flow 0 to easily understand the working of triggers.
+            // Skip the records with total flow -1 since they are invalid input.
+            if (totalFlow == null || totalFlow <= 0) {
+                return
+            }
+            c.output(KV.of<String, Int>(freeway, totalFlow))
+        }
+
+        companion object {
+            private const val VALID_NUM_FIELDS = 50
+        }
+    }
+
+    /** Inherits standard configuration options.  */
+    interface TrafficFlowOptions : ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
+
+        @get:Description("Input file to read from")
+        @get:Default.String("gs://apache-beam-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
+        var input: String
+
+        @get:Description("Numeric value of window duration for fixed windows, in minutes")
+        @get:Default.Integer(WINDOW_DURATION)
+        var windowDuration: Int?
+    }
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as TrafficFlowOptions
+        options.isStreaming = true
+
+        options.bigQuerySchema = schema
+
+        val exampleUtils = ExampleUtils(options)
+        exampleUtils.setup()
+
+        val pipeline = Pipeline.create(options)
+
+        val tableRef = getTableReference(
+                options.project, options.bigQueryDataset, options.bigQueryTable)
+
+        val resultList = pipeline
+                .apply("ReadMyFile", TextIO.read().from(options.input))
+                .apply("InsertRandomDelays", ParDo.of(InsertDelays()))
+                .apply(ParDo.of(ExtractFlowInfo()))
+                .apply(CalculateTotalFlow(options.windowDuration!!))
+
+        for (i in 0 until resultList.size()) {
+            resultList.get(i).apply<WriteResult>(BigQueryIO.writeTableRows().to(tableRef).withSchema(schema))
+        }
+
+        val result = pipeline.run()
+
+        // ExampleUtils will try to cancel the pipeline and the injector before the program exits.
+        exampleUtils.waitToFinish(result)
+    }
+
+    /** Add current time to each record. Also insert a delay at random to demo the triggers.  */
+    class InsertDelays : DoFn<String, String>() {
+
+        @ProcessElement
+        @Throws(Exception::class)
+        fun processElement(c: ProcessContext) {
+            var timestamp = Instant.now()
+            val random = Random()
+            if (random.nextDouble() < THRESHOLD) {
+                val range = MAX_DELAY - MIN_DELAY
+                val delayInMinutes = random.nextInt(range) + MIN_DELAY
+                val delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes.toLong())
+                timestamp = Instant(timestamp.millis - delayInMillis)
+            }
+            c.outputWithTimestamp(c.element(), timestamp)
+        }
+
+        companion object {
+            private const val THRESHOLD = 0.001
+            // MIN_DELAY and MAX_DELAY in minutes.
+            private const val MIN_DELAY = 1
+            private const val MAX_DELAY = 100
+        }
+    }
+
+    /** Sets the table reference.  */
+    private fun getTableReference(project: String, dataset: String, table: String): TableReference {
+        return TableReference().apply {
+            projectId = project
+            datasetId = dataset
+            tableId = table
+        }
+    }
+
+    private fun tryIntegerParse(number: String): Int? {
+        return try {
+            Integer.parseInt(number)
+        } catch (e: NumberFormatException) {
+            null
+        }
+
+    }
+}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
new file mode 100644
index 0000000..6af6ff1
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.snippets
+
+import com.google.api.services.bigquery.model.*
+import com.google.common.collect.ImmutableList
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.coders.AvroCoder
+import org.apache.beam.sdk.coders.DefaultCoder
+import org.apache.beam.sdk.coders.DoubleCoder
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.transforms.join.CoGbkResult
+import org.apache.beam.sdk.transforms.join.CoGroupByKey
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
+import org.apache.beam.sdk.values.*
+
+/** Code snippets used in webdocs.  */
+object Snippets {
+
+    @DefaultCoder(AvroCoder::class)
+    internal class Quote(
+            val source: String = "",
+            val quote: String = ""
+    )
+
+    @DefaultCoder(AvroCoder::class)
+    internal class WeatherData(
+            val year: Long = 0,
+            val month: Long = 0,
+            val day: Long = 0,
+            val maxTemp: Double = 0.0
+    )
+
+    @JvmOverloads
+    @SuppressFBWarnings("SE_BAD_FIELD")
+    //Apparently findbugs doesn't like that a non-serialized object i.e. pipeline is being used inside the run{} block
+    fun modelBigQueryIO(
+            pipeline: Pipeline, writeProject: String = "", writeDataset: String = "", writeTable: String = "") {
+        run {
+            // [START BigQueryTableSpec]
+            val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+            // [END BigQueryTableSpec]
+        }
+
+        run {
+            // [START BigQueryTableSpecWithoutProject]
+            val tableSpec = "samples.weather_stations"
+            // [END BigQueryTableSpecWithoutProject]
+        }
+
+        run {
+            // [START BigQueryTableSpecObject]
+            val tableSpec = TableReference()
+                    .setProjectId("clouddataflow-readonly")
+                    .setDatasetId("samples")
+                    .setTableId("weather_stations")
+            // [END BigQueryTableSpecObject]
+        }
+
+        run {
+            val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+            // [START BigQueryReadTable]
+            val maxTemperatures = pipeline.apply(BigQueryIO.readTableRows().from(tableSpec))
+                    // Each row is of type TableRow
+                    .apply<PCollection<Double>>(
+                            MapElements.into(TypeDescriptors.doubles())
+                                    .via(SerializableFunction<TableRow, Double> {
+                                        it["max_temperature"] as Double
+                                    })
+                    )
+            // [END BigQueryReadTable]
+        }
+
+        run {
+            val tableSpec = "clouddataflow-readonly:samples.weather_stations"
+            // [START BigQueryReadFunction]
+            val maxTemperatures = pipeline.apply(
+                    BigQueryIO.read { it.record["max_temperature"] as Double }
+                            .from(tableSpec)
+                            .withCoder(DoubleCoder.of()))
+            // [END BigQueryReadFunction]
+        }
+
+        run {
+            // [START BigQueryReadQuery]
+            val maxTemperatures = pipeline.apply(
+                    BigQueryIO.read { it.record["max_temperature"] as Double }
+                            .fromQuery(
+                                    "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]")
+                            .withCoder(DoubleCoder.of()))
+            // [END BigQueryReadQuery]
+        }
+
+        run {
+            // [START BigQueryReadQueryStdSQL]
+            val maxTemperatures = pipeline.apply(
+                    BigQueryIO.read { it.record["max_temperature"] as Double }
+                            .fromQuery(
+                                    "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
+                            .usingStandardSql()
+                            .withCoder(DoubleCoder.of()))
+            // [END BigQueryReadQueryStdSQL]
+        }
+
+        // [START BigQuerySchemaJson]
+        val tableSchemaJson = (
+                "{"
+                        + "  \"fields\": ["
+                        + "    {"
+                        + "      \"name\": \"source\","
+                        + "      \"type\": \"STRING\","
+                        + "      \"mode\": \"NULLABLE\""
+                        + "    },"
+                        + "    {"
+                        + "      \"name\": \"quote\","
+                        + "      \"type\": \"STRING\","
+                        + "      \"mode\": \"REQUIRED\""
+                        + "    }"
+                        + "  ]"
+                        + "}")
+        // [END BigQuerySchemaJson]
+
+        run {
+            var tableSpec = "clouddataflow-readonly:samples.weather_stations"
+            if (writeProject.isNotEmpty() && writeDataset.isNotEmpty() && writeTable.isNotEmpty()) {
+                tableSpec = "$writeProject:$writeDataset.$writeTable"
+            }
+
+            // [START BigQuerySchemaObject]
+            val tableSchema = TableSchema()
+                    .setFields(
+                            ImmutableList.of(
+                                    TableFieldSchema()
+                                            .setName("source")
+                                            .setType("STRING")
+                                            .setMode("NULLABLE"),
+                                    TableFieldSchema()
+                                            .setName("quote")
+                                            .setType("STRING")
+                                            .setMode("REQUIRED")))
+            // [END BigQuerySchemaObject]
+
+            // [START BigQueryWriteInput]
+            /*
+                @DefaultCoder(AvroCoder::class)
+                class Quote(
+                    val source: String = "",
+                    val quote: String = ""
+            )
+             */
+
+
+            val quotes = pipeline.apply(
+                    Create.of(
+                            Quote("Mahatma Gandhi", "My life is my message."),
+                            Quote("Yoda", "Do, or do not. There is no 'try'.")))
+            // [END BigQueryWriteInput]
+
+            // [START BigQueryWriteTable]
+            quotes
+                    .apply<PCollection<TableRow>>(
+                            MapElements.into(TypeDescriptor.of(TableRow::class.java))
+                                    .via(SerializableFunction<Quote, TableRow> { TableRow().set("source", it.source).set("quote", it.quote) }))
+                    .apply<WriteResult>(
+                            BigQueryIO.writeTableRows()
+                                    .to(tableSpec)
+                                    .withSchema(tableSchema)
+                                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                                    .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+            // [END BigQueryWriteTable]
+
+            // [START BigQueryWriteFunction]
+            quotes.apply<WriteResult>(
+                    BigQueryIO.write<Quote>()
+                            .to(tableSpec)
+                            .withSchema(tableSchema)
+                            .withFormatFunction { TableRow().set("source", it.source).set("quote", it.quote) }
+                            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+            // [END BigQueryWriteFunction]
+
+            // [START BigQueryWriteJsonSchema]
+            quotes.apply<WriteResult>(
+                    BigQueryIO.write<Quote>()
+                            .to(tableSpec)
+                            .withJsonSchema(tableSchemaJson)
+                            .withFormatFunction { TableRow().set("source", it.source).set("quote", it.quote) }
+                            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+            // [END BigQueryWriteJsonSchema]
+        }
+
+        run {
+            // [START BigQueryWriteDynamicDestinations]
+
+            /*
+            @DefaultCoder(AvroCoder::class)
+            class WeatherData(
+                    val year: Long = 0,
+                    val month: Long = 0,
+                    val day: Long = 0,
+                    val maxTemp: Double = 0.0
+            )
+            */
+            val weatherData = pipeline.apply(
+                    BigQueryIO.read {
+                        val record = it.record
+                        WeatherData(
+                                record.get("year") as Long,
+                                record.get("month") as Long,
+                                record.get("day") as Long,
+                                record.get("max_temperature") as Double)
+                    }
+                            .fromQuery("""
+                                SELECT year, month, day, max_temperature
+                                FROM [clouddataflow-readonly:samples.weather_stations]
+                                WHERE year BETWEEN 2007 AND 2009
+                            """.trimIndent())
+                            .withCoder(AvroCoder.of(WeatherData::class.java)))
+
+            // We will send the weather data into different tables for every year.
+            weatherData.apply<WriteResult>(
+                    BigQueryIO.write<WeatherData>()
+                            .to(
+                                    object : DynamicDestinations<WeatherData, Long>() {
+                                        override fun getDestination(elem: ValueInSingleWindow<WeatherData>): Long? {
+                                            return elem.value!!.year
+                                        }
+
+                                        override fun getTable(destination: Long?): TableDestination {
+                                            return TableDestination(
+                                                    TableReference()
+                                                            .setProjectId(writeProject)
+                                                            .setDatasetId(writeDataset)
+                                                            .setTableId("${writeTable}_$destination"),
+                                                    "Table for year $destination")
+                                        }
+
+                                        override fun getSchema(destination: Long?): TableSchema {
+                                            return TableSchema()
+                                                    .setFields(
+                                                            ImmutableList.of(
+                                                                    TableFieldSchema()
+                                                                            .setName("year")
+                                                                            .setType("INTEGER")
+                                                                            .setMode("REQUIRED"),
+                                                                    TableFieldSchema()
+                                                                            .setName("month")
+                                                                            .setType("INTEGER")
+                                                                            .setMode("REQUIRED"),
+                                                                    TableFieldSchema()
+                                                                            .setName("day")
+                                                                            .setType("INTEGER")
+                                                                            .setMode("REQUIRED"),
+                                                                    TableFieldSchema()
+                                                                            .setName("maxTemp")
+                                                                            .setType("FLOAT")
+                                                                            .setMode("NULLABLE")))
+                                        }
+                                    })
+                            .withFormatFunction {
+                                TableRow()
+                                        .set("year", it.year)
+                                        .set("month", it.month)
+                                        .set("day", it.day)
+                                        .set("maxTemp", it.maxTemp)
+                            }
+                            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+            // [END BigQueryWriteDynamicDestinations]
+
+            var tableSpec = "clouddataflow-readonly:samples.weather_stations"
+            if (writeProject.isNotEmpty() && writeDataset.isNotEmpty() && writeTable.isNotEmpty()) {
+                tableSpec = "$writeProject:$writeDataset.${writeTable}_partitioning"
+            }
+
+            val tableSchema = TableSchema()
+                    .setFields(
+                            ImmutableList.of(
+                                    TableFieldSchema().setName("year").setType("INTEGER").setMode("REQUIRED"),
+                                    TableFieldSchema()
+                                            .setName("month")
+                                            .setType("INTEGER")
+                                            .setMode("REQUIRED"),
+                                    TableFieldSchema().setName("day").setType("INTEGER").setMode("REQUIRED"),
+                                    TableFieldSchema()
+                                            .setName("maxTemp")
+                                            .setType("FLOAT")
+                                            .setMode("NULLABLE")))
+
+            // [START BigQueryTimePartitioning]
+            weatherData.apply<WriteResult>(
+                    BigQueryIO.write<WeatherData>()
+                            .to("${tableSpec}_partitioning")
+                            .withSchema(tableSchema)
+                            .withFormatFunction {
+                                TableRow()
+                                        .set("year", it.year)
+                                        .set("month", it.month)
+                                        .set("day", it.day)
+                                        .set("maxTemp", it.maxTemp)
+                            }
+                            // NOTE: an existing table without time partitioning set up will not work
+                            .withTimePartitioning(TimePartitioning().setType("DAY"))
+                            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
+            // [END BigQueryTimePartitioning]
+        }
+    }
+
+    /** Helper function to format results in coGroupByKeyTuple.  */
+    fun formatCoGbkResults(
+            name: String?, emails: Iterable<String>, phones: Iterable<String>): String {
+
+        val emailsList = ArrayList<String>()
+        for (elem in emails) {
+            emailsList.add("'$elem'")
+        }
+        emailsList.sort()
+        val emailsStr = "[${emailsList.joinToString(", ")}]"
+
+        val phonesList = ArrayList<String>()
+        for (elem in phones) {
+            phonesList.add("'$elem'")
+        }
+        phonesList.sort()
+        val phonesStr = "[${phonesList.joinToString(", ")}]"
+
+        return "$name; $emailsStr; $phonesStr"
+    }
+
+    /** Using a CoGroupByKey transform.  */
+    fun coGroupByKeyTuple(
+            emailsTag: TupleTag<String>,
+            phonesTag: TupleTag<String>,
+            emails: PCollection<KV<String, String>>,
+            phones: PCollection<KV<String, String>>): PCollection<String> {
+
+        // [START CoGroupByKeyTuple]
+        val results = KeyedPCollectionTuple.of(emailsTag, emails)
+                .and(phonesTag, phones)
+                .apply(CoGroupByKey.create())
+
+// [END CoGroupByKeyTuple]
+        return results.apply(
+                ParDo.of(
+                        object : DoFn<KV<String, CoGbkResult>, String>() {
+                            @ProcessElement
+                            fun processElement(c: ProcessContext) {
+                                val e = c.element()
+                                val name = e.key
+                                val emailsIter = e.value.getAll(emailsTag)
+                                val phonesIter = e.value.getAll(phonesTag)
+                                val formattedResult = formatCoGbkResults(name, emailsIter, phonesIter)
+                                c.output(formattedResult)
+                            }
+                        }))
+    }
+}
+/** Using a Read and Write transform to read/write from/to BigQuery.  */