You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/02/09 01:44:10 UTC
[beam] branch master updated: [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
This is an automated email from the ASF dual-hosted git repository.
danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6fe157a [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
6fe157a is described below
commit 6fe157aa3857dc44fc042bdb2d45a9fbf2dbaf1b
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Tue Feb 8 17:42:59 2022 -0800
[BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
Also does some polish and fixup to the Kafka taxi example to fix little nits that were noticed while adding this new example.
---
sdks/go/examples/kafka/taxi.go | 47 +++---
sdks/go/examples/xlang/bigquery/wordcount.go | 220 +++++++++++++++++++++++++++
2 files changed, 245 insertions(+), 22 deletions(-)
diff --git a/sdks/go/examples/kafka/taxi.go b/sdks/go/examples/kafka/taxi.go
index 2c9c3c2..aeb539b 100644
--- a/sdks/go/examples/kafka/taxi.go
+++ b/sdks/go/examples/kafka/taxi.go
@@ -22,7 +22,9 @@
//
// Running this example requires a Kafka cluster accessible to the runner, and
// a cross-language expansion service that can expand Kafka read and write
-// transforms.
+// transforms. An address to a persistent expansion service can be provided as
+// a flag, or if none is specified then the SDK will attempt to automatically
+// start an appropriate expansion service.
//
// Setting Up a Kafka Cluster
//
@@ -34,22 +36,24 @@
//
// Running an Expansion Server
//
-// These instructions will cover running the Java IO Expansion Service, and
-// therefore requires a JDK installation in a version supported by Beam.
-// Depending on whether you are running this from a numbered Beam release, or a
-// development environment, there are two sources you may use for the Expansion
-// service.
+// If the automatic expansion service functionality is not available for your
+// environment, or if you want improved performance, you will need to start a
+// persistent expansion service. These instructions will cover running the Java
+// IO Expansion Service, and therefore requires a JDK installation in a version
+// supported by Beam. Depending on whether you are running this from a numbered
+// Beam release, or a development environment, there are two sources you may
+// use for the Expansion service.
//
// Numbered release: The expansion service jar is vendored as module
-// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
-// This jar can be executed directly with the following command:
+// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
+// This jar can be executed directly with the following command:
// `java -jar <jar_name> <port_number>`
// Development env: This requires that the JAVA_HOME environment variable
-// points to your JDK installation. From the root `beam/` directory of the
-// Apache Beam repository, the jar can be built (or built and run) with the
-// following commands:
-// Build: ./gradlew :sdks:java:io:expansion-service:build
-// Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>
+// points to your JDK installation. From the root `beam/` directory of the
+// Apache Beam repository, the jar can be built (or built and run) with the
+// following commands:
+// Build: ./gradlew :sdks:java:io:expansion-service:build
+// Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>
//
// Running the Example on GCP
//
@@ -64,7 +68,7 @@
// export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
// export BOOTSTRAP_SERVERS="123.45.67.89:1234"
// export EXPANSION_ADDR="localhost:1234"
-// go run ./sdks/go/examples/kafka/types/types.go \
+// go run ./sdks/go/examples/kafka/taxi.go \
// --runner=DataflowRunner \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
@@ -72,7 +76,6 @@
// --region=$REGION \
// --job_name="${JOB_NAME}" \
// --bootstrap_servers=$BOOTSTRAP_SERVER \
-// --experiments=use_portable_job_submission,use_runner_v2 \
// --expansion_addr=$EXPANSION_ADDR
//
// Running the Example From a Git Clone
@@ -91,9 +94,11 @@
// accessible locally.
//
// Additionally, you must provide the location of your custom container to the
-// pipeline with the --sdk_harness_container_image_override flag. For example:
+// pipeline with the --sdk_harness_container_image_override flag for Java, or
+// --environment_config flag for Go. For example:
//
-// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest"
+// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \
+// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
package main
import (
@@ -111,9 +116,10 @@ import (
)
var (
- expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+ expansionAddr = flag.String("expansion_addr", "",
+ "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
bootstrapServers = flag.String("bootstrap_servers", "",
- "URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+ "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
)
@@ -139,9 +145,6 @@ func main() {
beam.Init()
ctx := context.Background()
- if *expansionAddr == "" {
- log.Fatal(ctx, "No expansion address provided")
- }
p := beam.NewPipeline()
s := p.Root()
diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go b/sdks/go/examples/xlang/bigquery/wordcount.go
new file mode 100644
index 0000000..9500d2c
--- /dev/null
+++ b/sdks/go/examples/xlang/bigquery/wordcount.go
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery.
+// This example runs a batch pipeline that reads from the public table "shakespeare" described here:
+// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per
+// different work, aggregates them to find total word counts in all works, as well as the average
+// number of times a word appears if it appears in a work, and then writes all that data to a given
+// output table.
+//
+// This example is only expected to work on Dataflow, and requires a cross-language expansion
+// service that can expand BigQuery read and write transforms. An address to a persistent expansion
+// service can be provided as a flag, or if none is specified then the SDK will attempt to
+// automatically start an appropriate expansion service.
+//
+// Running an Expansion Server
+//
+// If the automatic expansion service functionality is not available for your environment, or if
+// you want improved performance, you will need to start a persistent expansion service. These
+// instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK
+// installation in a version supported by Beam. Depending on whether you are running this from a
+// numbered Beam release, or a development environment, there are two sources you may use for the
+// Expansion service.
+//
+// Numbered release: The expansion service jar is vendored as module
+// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository.
+// This jar can be executed directly with the following command:
+// `java -jar <jar_name> <port_number>`
+//
+// Development env: This requires that the JAVA_HOME environment variable points to your JDK
+// installation. From the root `beam/` directory of the Apache Beam repository, the jar can be
+// built (or built and run) with the following commands:
+// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
+// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num>
+//
+// Running the Example on GCP
+//
+// An example command for executing this pipeline on GCP is as follows:
+// export PROJECT="$(gcloud config get-value project)"
+// export TEMP_LOCATION="gs://MY-BUCKET/temp"
+// export REGION="us-central1"
+// export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`"
+// export OUTPUT_TABLE="123.45.67.89:1234"
+// export EXPANSION_ADDR="localhost:1234"
+// export OUTPUT_TABLE="project_id:dataset_id.table_id"
+// go run ./sdks/go/examples/kafka/types/types.go \
+// --runner=DataflowRunner \
+// --temp_location=$TEMP_LOCATION \
+// --staging_location=$STAGING_LOCATION \
+// --project=$PROJECT \
+// --region=$REGION \
+// --job_name="${JOB_NAME}" \
+// --bootstrap_servers=$BOOTSTRAP_SERVER \
+// --expansion_addr=$EXPANSION_ADDR \
+// --out_table=$OUTPUT_TABLE
+//
+// Running the Example From a Git Clone
+//
+// When running on a development environment, a custom container will likely need to be provided
+// for the cross-language SDK. First this will require building and pushing the SDK container to
+// container repository, such as Docker Hub.
+//
+// export DOCKER_ROOT="Your Docker Repository Root"
+// ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+// docker push $DOCKER_ROOT/beam_java8_sdk:latest
+//
+// For runners in local mode, simply building the container using the default values for
+// docker-repository-root and docker-tag will work to have it accessible locally.
+//
+// Additionally, you must provide the location of your custom container to the pipeline with the
+// --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For
+// example:
+//
+// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \
+// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
+package main
+
+import (
+ "context"
+ "flag"
+ "math"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+)
+
+var (
+ // Set this to the address of the expansion service to use for BigQuery read and write, or leave
+ // unspecified to attempt to automatically start an expansion service.
+ expansionAddr = flag.String("expansion_addr", "",
+ "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+ // Set this required option to specify where to write the output. If the table does not exist,
+ // a new one will be created. If the table already exists, elements will be appended to it.
+ outTable = flag.String("out_table", "", "Output table (required).")
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil)))
+ beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil)))
+ beam.RegisterType(reflect.TypeOf((*CountsRow)(nil)))
+ beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil)))
+}
+
+// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
+// be read properly, field names must match names from the BigQuery table, so some fields must
+// include underlines.
+type ShakesRow struct {
+ Word string `beam:"word"`
+ Word_count int64 `beam:"word_count"`
+ Corpus string `beam:"corpus"`
+ Corpus_date int64 `beam:"corpus_date"`
+}
+
+// CountsRow is a struct corresponding to the schema of the output table. For writes, field names
+// are derived from the Beam schema names specified below as struct tags.
+type CountsRow struct {
+ // Word is the word being counted.
+ Word string `beam:"word"`
+ // WordCount is the count of how many times the word appears in all works combined.
+ WordCount int64 `beam:"word_count"`
+ // CorpusCount is the count of how many works the word appears in.
+ CorpusCount int64 `beam:"corpus_count"`
+ // AvgCount is the average number of times a word appears in all works that it appears in. In
+ // other words, this is equivalent to WordCount divided by CorpusCount.
+ AvgCount float64 `beam:"avg_count"`
+}
+
+// WordsAccum is an accumulator for combining Shakespeare word counts in order to get averages of
+// word counts.
+type WordsAccum struct {
+ // Word is the word being counted.
+ Word string
+ // Count is the number of times this word has appeared, or in other words the number of corpuses
+ // it appears in (assuming that the input never repeats a word and corpus pair.
+ Count int64
+ // Sum is the sum of word counts from inputs.
+ Sum int64
+}
+
+// WordsCombine is a CombineFn that adds up word counts and calculates average number of counts.
+type WordsCombine struct{}
+
+// CreateAccumulator creates a default WordsAccum.
+func (fn *WordsCombine) CreateAccumulator() WordsAccum {
+ return WordsAccum{}
+}
+
+// AddInput sums up word counts and increments the corpus count.
+func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum {
+ a.Word = row.Word
+ a.Count += 1
+ a.Sum += row.Word_count
+ return a
+}
+
+// MergeAccumulators sums up the various counts being accumulated.
+func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum {
+ return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + v.Sum}
+}
+
+// ExtractOutput calculates the average and fills out the output rows.
+func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow {
+ row := CountsRow{
+ Word: a.Word,
+ WordCount: a.Sum,
+ CorpusCount: a.Count,
+ }
+ if a.Count == 0 {
+ row.AvgCount = math.NaN()
+ } else {
+ row.AvgCount = float64(a.Sum) / float64(a.Count)
+ }
+ return row
+}
+
+func main() {
+ flag.Parse()
+ beam.Init()
+
+ p := beam.NewPipeline()
+ s := p.Root()
+
+ // Read from the public BigQuery table.
+ inType := reflect.TypeOf((*ShakesRow)(nil)).Elem()
+ rows := bigqueryio.Read(s, inType,
+ bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"),
+ bigqueryio.ReadExpansionAddr(*expansionAddr))
+
+ // Combine the data per word.
+ keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) {
+ return elm.Word, elm
+ }, rows)
+ counts := beam.CombinePerKey(s, &WordsCombine{}, keyed)
+ countVals := beam.DropKey(s, counts)
+
+ // Write the data to the given BigQuery table destination, creating the table if needed.
+ bigqueryio.Write(s, *outTable, countVals,
+ bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
+ bigqueryio.WriteExpansionAddr(*expansionAddr))
+
+ ctx := context.Background()
+ if err := beamx.Run(ctx, p); err != nil {
+ log.Fatalf(ctx, "Failed to execute job: %v", err)
+ }
+}