You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/06/26 21:31:30 UTC
[beam] branch master updated: [Tour of Beam] add work example (#27080)
This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 31cb00e7d87 [Tour of Beam] add work example (#27080)
31cb00e7d87 is described below
commit 31cb00e7d87080d4d085c367a5524d902fb2c902
Author: Abzal Tuganbay <ab...@gmail.com>
AuthorDate: Tue Jun 27 03:31:24 2023 +0600
[Tour of Beam] add work example (#27080)
* add work example
* correct
* correct tags for bigquery examples
* correct read-query
* correct read-query tag
* correct imports
* remove package
* correct
* fixed example name
---------
Co-authored-by: mende1esmende1es <me...@gmail.cp>
Co-authored-by: Oleh Borysevych <ol...@akvelon.com>
---
.../io/big-query-io/read-query/go-example/main.go | 74 ++++++++++++----------
.../big-query-io/read-query/java-example/Task.java | 64 +++++++++----------
.../big-query-io/read-query/python-example/task.py | 52 ++++++++-------
.../io/big-query-io/read-table/go-example/main.go | 3 +-
.../big-query-io/read-table/java-example/Task.java | 6 +-
.../big-query-io/read-table/python-example/task.py | 9 +--
6 files changed, 107 insertions(+), 101 deletions(-)
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
index fec979ad7ed..49ab6057bac 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
@@ -18,9 +18,11 @@
// beam-playground:
// name: read-query
-// description: BigQuery read query example.
+// description: BigQueryIO read query example.
// multifile: false
-// context_line: 40
+// context_line: 42
+// never_run: true
+// always_run: true
// categories:
// - Quickstart
// complexity: ADVANCED
@@ -29,47 +31,49 @@
package main
import (
- _ "context"
- _ "flag"
- _ "github.com/apache/beam/sdks/v2/go/pkg/beam"
- _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
- _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
- _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
- _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+ "context"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+
+ "cloud.google.com/go/bigquery"
internal_log "log"
- _ "reflect"
+ "reflect"
)
-// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments.
-//The bigquery tag in the struct field is used to map the struct field to the BigQuery column.
-type CommentRow struct {
- Text string `bigquery:"text"`
+type Game struct {
+ GameID bigquery.NullString `bigquery:"gameId"`
+ GameNumber bigquery.NullInt64 `bigquery:"gameNumber"`
+ SeasonID bigquery.NullString `bigquery:"seasonId"`
+ Year bigquery.NullInt64 `bigquery:"year"`
+ Type bigquery.NullString `bigquery:"type"`
+ DayNight bigquery.NullString `bigquery:"dayNight"`
+ Duration bigquery.NullString `bigquery:"duration"`
}
-// Construct the BigQuery query: A constant query is defined that selects the text column
-// from the bigquery-public-data.hacker_news.comments table for a certain time range.
-const query = `SELECT text
-FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
-WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
-LIMIT 1000
-`
-
func main() {
internal_log.Println("Running Task")
- /*
- ctx := context.Background()
- p := beam.NewPipeline()
- s := p.Root()
- project := "tess-372508"
- // Build a PCollection<CommentRow> by querying BigQuery.
- rows := bigqueryio.Query(s, project, query,
- reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
+ ctx := context.Background()
+ p := beam.NewPipeline()
+ s := p.Root()
+ project := "apache-beam-testing"
- debug.Print(s, rows)
+ // Build a PCollection<CommentRow> by querying BigQuery.
+ rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`",
+ reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL())
- // Now that the pipeline is fully constructed, we execute it.
- if err := beamx.Run(ctx, p); err != nil {
- log.Exitf(ctx, "Failed to execute job: %v", err)
- }*/
+ fixedSizeLines := top.Largest(s, rows, 5, less)
+
+ debug.Print(s, fixedSizeLines)
+ // Now that the pipeline is fully constructed, we execute it.
+ if err := beamx.Run(ctx, p); err != nil {
+ log.Exitf(ctx, "Failed to execute job: %v", err)
+ }
+}
+func less(a, b Game) bool {
+ return true
}
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
index 256c70919ce..12c1fbcd9b4 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
@@ -27,11 +27,10 @@
// tags:
// - hellobeam
+import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
-import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
@@ -40,52 +39,49 @@ import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Task {
+public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
- public static void main(String[] args) {
- LOG.info("Running Task");
- System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
- PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- options.setTempLocation("gs://bucket");
- options.as(BigQueryOptions.class).setProject("project-id");
+ private static final String WEATHER_SAMPLES_QUERY =
+ "select * from `clouddataflow-readonly.samples.weather_stations`";
- Pipeline pipeline = Pipeline.create(options);
+ public static void applyBigQueryTornadoes(Pipeline p) {
+ /*TypedRead<TableRow> bigqueryIO =
+ BigQueryIO.readTableRows()
+ .fromQuery(WEATHER_SAMPLES_QUERY)
+ .usingStandardSql();
- // pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection.
- // The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record.
- // The .fromQuery("SELECT field FROM project-id.dataset.table")
- // specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively.
-/*
- PCollection<Double> pCollection = pipeline
- .apply(BigQueryIO.read(
- (SchemaAndRecord elem) -> (Double) elem.getRecord().get("field"))
- .fromQuery(
- "SELECT field FROM `project-id.dataset.table`")
- .usingStandardSql()
- .withCoder(DoubleCoder.of()));
- pCollection
- .apply("Log words", ParDo.of(new LogOutput<>()));
-*/
- pipeline.run();
+ PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);
+
+ rowsFromBigQuery
+ .apply(ParDo.of(new LogOutput<>("Result: ")));*/
+ }
+
+ public static void runBigQueryTornadoes(PipelineOptions options) {
+ Pipeline p = Pipeline.create(options);
+ applyBigQueryTornadoes(p);
+ p.run().waitUntilFinish();
+ }
+
+ public static void main(String[] args) {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
+ runBigQueryTornadoes(options);
}
static class LogOutput<T> extends DoFn<T, T> {
private final String prefix;
- LogOutput() {
- this.prefix = "Processing element";
- }
-
LogOutput(String prefix) {
this.prefix = prefix;
}
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- LOG.info(prefix + ": {}", c.element());
+ public void processElement(ProcessContext c) {
+ LOG.info(prefix + c.element());
+ c.output(c.element());
}
}
-}
\ No newline at end of file
+}
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
index 6204635c630..fbb1e1e302d 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
@@ -16,8 +16,10 @@
# beam-playground:
# name: read-query
-# description: TextIO read query example.
+# description: BigQueryIO read query example.
# multifile: false
+# never_run: true
+# always_run: true
# context_line: 34
# categories:
# - Quickstart
@@ -26,39 +28,43 @@
# - hellobeam
import argparse
+import os
+import warnings
+
import apache_beam as beam
-from apache_beam.io import ReadFromText
-from apache_beam.io import WriteToBigQuery
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
+from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery
+
+class WeatherData:
+ def __init__(self, station_number, wban_number, year, month, day):
+ self.station_number = station_number
+ self.wban_number = wban_number
+ self.year = year
+ self.month = month
+ self.day = day
+ def __str__(self):
+ return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}"
def run(argv=None):
parser = argparse.ArgumentParser()
- parser.add_argument('--input',
- dest='input',
- default='gs://bucket',
- help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
- pipeline_options.view_as(SetupOptions).save_main_session = True
+ pipeline_options.view_as(PipelineOptions)
- """
- (p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the
- pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery(
- query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >>
- beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the
- PCollection and logs the output to the console. It prints the 'field' column from each row in the table.
- """
- with beam.Pipeline(options=pipeline_options) as p:
- (p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`')))
- # Each row is a dictionary where the keys are the BigQuery columns
- #| beam.Map(lambda elem: elem['field'])
- )
+ with beam.Pipeline(options=pipeline_options, argv=argv) as p:
+ (p
+ # | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True,
+ # method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
+ # | beam.combiners.Sample.FixedSizeGlobally(5)
+ # | beam.FlatMap(lambda line: line)
+ # | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day']))
+ # | beam.Map(print)
+ )
if __name__ == '__main__':
- run()
+ run()
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
index 1751beb191e..ef2462f4de3 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
@@ -28,6 +28,7 @@
// complexity: ADVANCED
// tags:
// - hellobeam
+
package main
import (
@@ -62,7 +63,7 @@ func main() {
s := p.Root()
project := "apache-beam-testing"
- // Build a PCollection<CommentRow> by querying BigQuery.
+ // Build a PCollection<Game> by querying BigQuery.
rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{}))
fixedSizeLines := top.Largest(s, rows, 5, less)
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
index 835954382a9..206a0c0b8ee 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
@@ -35,7 +35,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
index d57d9a145b8..e89779e5a26 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
@@ -16,7 +16,7 @@
# beam-playground:
# name: read-table
-# description: TextIO read table example.
+# description: BigQueryIO read table example.
# multifile: false
# never_run: true
# always_run: true
@@ -28,13 +28,8 @@
# - hellobeam
import argparse
-import os
-import warnings
-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
-#from google.cloud import bigquery
-from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery
class WeatherData:
def __init__(self, station_number, wban_number, year, month, day):
@@ -57,7 +52,7 @@ def run(argv=None):
with beam.Pipeline(options=pipeline_options, argv=argv) as p:
- (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
+ (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
| beam.combiners.Sample.FixedSizeGlobally(5)
| beam.FlatMap(lambda line: line)