You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/02/09 01:44:10 UTC

[beam] branch master updated: [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)

This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe157a  [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
6fe157a is described below

commit 6fe157aa3857dc44fc042bdb2d45a9fbf2dbaf1b
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Tue Feb 8 17:42:59 2022 -0800

    [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
    
    Also does some polish and fixup to the Kafka taxi example to fix little nits that were noticed while adding this new example.
---
 sdks/go/examples/kafka/taxi.go               |  47 +++---
 sdks/go/examples/xlang/bigquery/wordcount.go | 220 +++++++++++++++++++++++++++
 2 files changed, 245 insertions(+), 22 deletions(-)

diff --git a/sdks/go/examples/kafka/taxi.go b/sdks/go/examples/kafka/taxi.go
index 2c9c3c2..aeb539b 100644
--- a/sdks/go/examples/kafka/taxi.go
+++ b/sdks/go/examples/kafka/taxi.go
@@ -22,7 +22,9 @@
 //
 // Running this example requires a Kafka cluster accessible to the runner, and
 // a cross-language expansion service that can expand Kafka read and write
-// transforms.
+// transforms. An address to a persistent expansion service can be provided as
+// a flag, or if none is specified then the SDK will attempt to automatically
+// start an appropriate expansion service.
 //
 // Setting Up a Kafka Cluster
 //
@@ -34,22 +36,24 @@
 //
 // Running an Expansion Server
 //
-// These instructions will cover running the Java IO Expansion Service, and
-// therefore requires a JDK installation in a version supported by Beam.
-// Depending on whether you are running this from a numbered Beam release, or a
-// development environment, there are two sources you may use for the Expansion
-// service.
+// If the automatic expansion service functionality is not available for your
+// environment, or if you want improved performance, you will need to start a
+// persistent expansion service. These instructions will cover running the Java
+// IO Expansion Service, and therefore requires a JDK installation in a version
+// supported by Beam. Depending on whether you are running this from a numbered
+// Beam release, or a development environment, there are two sources you may
+// use for the Expansion service.
 //
 // Numbered release: The expansion service jar is vendored as module
-//   org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
-//   This jar can be executed directly with the following command:
+// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
+// This jar can be executed directly with the following command:
 //   `java -jar <jar_name> <port_number>`
 // Development env: This requires that the JAVA_HOME environment variable
-//   points to your JDK installation. From the root `beam/` directory of the
-//   Apache Beam repository, the jar can be built (or built and run) with the
-//   following commands:
-//     Build: ./gradlew :sdks:java:io:expansion-service:build
-//     Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>
+// points to your JDK installation. From the root `beam/` directory of the
+// Apache Beam repository, the jar can be built (or built and run) with the
+// following commands:
+//   Build: ./gradlew :sdks:java:io:expansion-service:build
+//   Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>
 //
 // Running the Example on GCP
 //
@@ -64,7 +68,7 @@
 //   export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
 //   export BOOTSTRAP_SERVERS="123.45.67.89:1234"
 //   export EXPANSION_ADDR="localhost:1234"
-//   go run ./sdks/go/examples/kafka/types/types.go \
+//   go run ./sdks/go/examples/kafka/taxi.go \
 //     --runner=DataflowRunner \
 //     --temp_location=$TEMP_LOCATION \
 //     --staging_location=$STAGING_LOCATION \
@@ -72,7 +76,6 @@
 //     --region=$REGION \
 //     --job_name="${JOB_NAME}" \
 //     --bootstrap_servers=$BOOTSTRAP_SERVER \
-//     --experiments=use_portable_job_submission,use_runner_v2 \
 //     --expansion_addr=$EXPANSION_ADDR
 //
 // Running the Example From a Git Clone
@@ -91,9 +94,11 @@
 // accessible locally.
 //
 // Additionally, you must provide the location of your custom container to the
-// pipeline with the --sdk_harness_container_image_override flag. For example:
+// pipeline with the --sdk_harness_container_image_override flag for Java, or
+// --environment_config flag for Go. For example:
 //
-//   --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest"
+//   --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \
+//   --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
 package main
 
 import (
@@ -111,9 +116,10 @@ import (
 )
 
 var (
-	expansionAddr    = flag.String("expansion_addr", "", "Address of Expansion Service")
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
 	bootstrapServers = flag.String("bootstrap_servers", "",
-		"URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
 	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
 )
 
@@ -139,9 +145,6 @@ func main() {
 	beam.Init()
 
 	ctx := context.Background()
-	if *expansionAddr == "" {
-		log.Fatal(ctx, "No expansion address provided")
-	}
 
 	p := beam.NewPipeline()
 	s := p.Root()
diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go b/sdks/go/examples/xlang/bigquery/wordcount.go
new file mode 100644
index 0000000..9500d2c
--- /dev/null
+++ b/sdks/go/examples/xlang/bigquery/wordcount.go
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery.
+// This example runs a batch pipeline that reads from the public table "shakespeare" described here:
+// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per
+// different work, aggregates them to find total word counts in all works, as well as the average
+// number of times a word appears if it appears in a work, and then writes all that data to a given
+// output table.
+//
+// This example is only expected to work on Dataflow, and requires a cross-language expansion
+// service that can expand BigQuery read and write transforms. An address to a persistent expansion
+// service can be provided as a flag, or if none is specified then the SDK will attempt to
+// automatically start an appropriate expansion service.
+//
+// Running an Expansion Server
+//
+// If the automatic expansion service functionality is not available for your environment, or if
+// you want improved performance, you will need to start a persistent expansion service. These
+// instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK
+// installation in a version supported by Beam. Depending on whether you are running this from a
+// numbered Beam release, or a development environment, there are two sources you may use for the
+// Expansion service.
+//
+// Numbered release: The expansion service jar is vendored as module
+// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository.
+// This jar can be executed directly with the following command:
+//   `java -jar <jar_name> <port_number>`
+//
+// Development env: This requires that the JAVA_HOME environment variable points to your JDK
+// installation. From the root `beam/` directory of the Apache Beam repository, the jar can be
+// built (or built and run) with the following commands:
+//   ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
+//   ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num>
+//
+// Running the Example on GCP
+//
+// An example command for executing this pipeline on GCP is as follows:
+//   export PROJECT="$(gcloud config get-value project)"
+//   export TEMP_LOCATION="gs://MY-BUCKET/temp"
+//   export REGION="us-central1"
+//   export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`"
+//   export OUTPUT_TABLE="123.45.67.89:1234"
+//   export EXPANSION_ADDR="localhost:1234"
+//   export OUTPUT_TABLE="project_id:dataset_id.table_id"
+//   go run ./sdks/go/examples/kafka/types/types.go \
+//     --runner=DataflowRunner \
+//     --temp_location=$TEMP_LOCATION \
+//     --staging_location=$STAGING_LOCATION \
+//     --project=$PROJECT \
+//     --region=$REGION \
+//     --job_name="${JOB_NAME}" \
+//     --bootstrap_servers=$BOOTSTRAP_SERVER \
+//     --expansion_addr=$EXPANSION_ADDR \
+//     --out_table=$OUTPUT_TABLE
+//
+// Running the Example From a Git Clone
+//
+// When running on a development environment, a custom container will likely need to be provided
+// for the cross-language SDK. First this will require building and pushing the SDK container to
+// container repository, such as Docker Hub.
+//
+//   export DOCKER_ROOT="Your Docker Repository Root"
+//   ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+//   docker push $DOCKER_ROOT/beam_java8_sdk:latest
+//
+// For runners in local mode, simply building the container using the default values for
+// docker-repository-root and docker-tag will work to have it accessible locally.
+//
+// Additionally, you must provide the location of your custom container to the pipeline with the
+// --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For
+// example:
+//
+//   --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \
+//   --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
+package main
+
+import (
+	"context"
+	"flag"
+	"math"
+	"reflect"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+)
+
+var (
+	// Set this to the address of the expansion service to use for BigQuery read and write, or leave
+	// unspecified to attempt to automatically start an expansion service.
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	// Set this required option to specify where to write the output. If the table does not exist,
+	// a new one will be created. If the table already exists, elements will be appended to it.
+	outTable = flag.String("out_table", "", "Output table (required).")
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil)))
+	beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil)))
+	beam.RegisterType(reflect.TypeOf((*CountsRow)(nil)))
+	beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil)))
+}
+
+// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
+// be read properly, field names must match names from the BigQuery table, so some fields must
+// include underlines.
+type ShakesRow struct {
+	Word        string `beam:"word"`
+	Word_count  int64  `beam:"word_count"`
+	Corpus      string `beam:"corpus"`
+	Corpus_date int64  `beam:"corpus_date"`
+}
+
+// CountsRow is a struct corresponding to the schema of the output table. For writes, field names
+// are derived from the Beam schema names specified below as struct tags.
+type CountsRow struct {
+	// Word is the word being counted.
+	Word string `beam:"word"`
+	// WordCount is the count of how many times the word appears in all works combined.
+	WordCount int64 `beam:"word_count"`
+	// CorpusCount is the count of how many works the word appears in.
+	CorpusCount int64 `beam:"corpus_count"`
+	// AvgCount is the average number of times a word appears in all works that it appears in. In
+	// other words, this is equivalent to WordCount divided by CorpusCount.
+	AvgCount float64 `beam:"avg_count"`
+}
+
+// WordsAccum is an accumulator for combining Shakespeare word counts in order to get averages of
+// word counts.
+type WordsAccum struct {
+	// Word is the word being counted.
+	Word string
+	// Count is the number of times this word has appeared, or in other words the number of corpuses
+	// it appears in (assuming that the input never repeats a word and corpus pair.
+	Count int64
+	// Sum is the sum of word counts from inputs.
+	Sum int64
+}
+
+// WordsCombine is a CombineFn that adds up word counts and calculates average number of counts.
+type WordsCombine struct{}
+
+// CreateAccumulator creates a default WordsAccum.
+func (fn *WordsCombine) CreateAccumulator() WordsAccum {
+	return WordsAccum{}
+}
+
+// AddInput sums up word counts and increments the corpus count.
+func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum {
+	a.Word = row.Word
+	a.Count += 1
+	a.Sum += row.Word_count
+	return a
+}
+
+// MergeAccumulators sums up the various counts being accumulated.
+func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum {
+	return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + v.Sum}
+}
+
+// ExtractOutput calculates the average and fills out the output rows.
+func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow {
+	row := CountsRow{
+		Word:        a.Word,
+		WordCount:   a.Sum,
+		CorpusCount: a.Count,
+	}
+	if a.Count == 0 {
+		row.AvgCount = math.NaN()
+	} else {
+		row.AvgCount = float64(a.Sum) / float64(a.Count)
+	}
+	return row
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	// Read from the public BigQuery table.
+	inType := reflect.TypeOf((*ShakesRow)(nil)).Elem()
+	rows := bigqueryio.Read(s, inType,
+		bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"),
+		bigqueryio.ReadExpansionAddr(*expansionAddr))
+
+	// Combine the data per word.
+	keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) {
+		return elm.Word, elm
+	}, rows)
+	counts := beam.CombinePerKey(s, &WordsCombine{}, keyed)
+	countVals := beam.DropKey(s, counts)
+
+	// Write the data to the given BigQuery table destination, creating the table if needed.
+	bigqueryio.Write(s, *outTable, countVals,
+		bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
+		bigqueryio.WriteExpansionAddr(*expansionAddr))
+
+	ctx := context.Background()
+	if err := beamx.Run(ctx, p); err != nil {
+		log.Fatalf(ctx, "Failed to execute job: %v", err)
+	}
+}