You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/06/26 21:31:30 UTC

[beam] branch master updated: [Tour of Beam] add work example (#27080)

This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 31cb00e7d87 [Tour of Beam] add work example (#27080)
31cb00e7d87 is described below

commit 31cb00e7d87080d4d085c367a5524d902fb2c902
Author: Abzal Tuganbay <ab...@gmail.com>
AuthorDate: Tue Jun 27 03:31:24 2023 +0600

    [Tour of Beam] add work example (#27080)
    
    * add work example
    
    * correct
    
    * correct tags for bigquery examples
    
    * correct read-query
    
    * correct read-query tag
    
    * correct imports
    
    * remove package
    
    * correct
    
    * fixed example name
    
    ---------
    
    Co-authored-by: mende1esmende1es <me...@gmail.cp>
    Co-authored-by: Oleh Borysevych <ol...@akvelon.com>
---
 .../io/big-query-io/read-query/go-example/main.go  | 74 ++++++++++++----------
 .../big-query-io/read-query/java-example/Task.java | 64 +++++++++----------
 .../big-query-io/read-query/python-example/task.py | 52 ++++++++-------
 .../io/big-query-io/read-table/go-example/main.go  |  3 +-
 .../big-query-io/read-table/java-example/Task.java |  6 +-
 .../big-query-io/read-table/python-example/task.py |  9 +--
 6 files changed, 107 insertions(+), 101 deletions(-)

diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
index fec979ad7ed..49ab6057bac 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go
@@ -18,9 +18,11 @@
 
 // beam-playground:
 //   name: read-query
-//   description: BigQuery read query example.
+//   description: BigQueryIO read query example.
 //   multifile: false
-//   context_line: 40
+//   context_line: 42
+//   never_run: true
+//   always_run: true
 //   categories:
 //     - Quickstart
 //   complexity: ADVANCED
@@ -29,47 +31,49 @@
 package main
 
 import (
-	_ "context"
-	_ "flag"
-	_ "github.com/apache/beam/sdks/v2/go/pkg/beam"
-	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
-	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
-	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
-	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"context"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+
+	"cloud.google.com/go/bigquery"
 	internal_log "log"
-	_ "reflect"
+	"reflect"
 )
 
-// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments.
-//The bigquery tag in the struct field is used to map the struct field to the BigQuery column.
-type CommentRow struct {
-	Text string `bigquery:"text"`
+type Game struct {
+	GameID     bigquery.NullString `bigquery:"gameId"`
+	GameNumber bigquery.NullInt64  `bigquery:"gameNumber"`
+	SeasonID   bigquery.NullString `bigquery:"seasonId"`
+	Year       bigquery.NullInt64  `bigquery:"year"`
+	Type       bigquery.NullString `bigquery:"type"`
+	DayNight   bigquery.NullString `bigquery:"dayNight"`
+	Duration   bigquery.NullString `bigquery:"duration"`
 }
 
-// Construct the BigQuery query: A constant query is defined that selects the text column
-// from the bigquery-public-data.hacker_news.comments table for a certain time range.
-const query = `SELECT text
-FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
-WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
-LIMIT 1000
-`
-
 func main() {
 	internal_log.Println("Running Task")
-	/*
-		ctx := context.Background()
-		p := beam.NewPipeline()
-		s := p.Root()
-		project := "tess-372508"
 
-		// Build a PCollection<CommentRow> by querying BigQuery.
-		rows := bigqueryio.Query(s, project, query,
-			reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
+	ctx := context.Background()
+	p := beam.NewPipeline()
+	s := p.Root()
+	project := "apache-beam-testing"
 
-		debug.Print(s, rows)
+	// Build a PCollection<CommentRow> by querying BigQuery.
+	rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`",
+		reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL())
 
-		// Now that the pipeline is fully constructed, we execute it.
-		if err := beamx.Run(ctx, p); err != nil {
-			log.Exitf(ctx, "Failed to execute job: %v", err)
-		}*/
+	fixedSizeLines := top.Largest(s, rows, 5, less)
+
+	debug.Print(s, fixedSizeLines)
+	// Now that the pipeline is fully constructed, we execute it.
+	if err := beamx.Run(ctx, p); err != nil {
+		log.Exitf(ctx, "Failed to execute job: %v", err)
+	}
+}
+func less(a, b Game) bool {
+	return true
 }
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
index 256c70919ce..12c1fbcd9b4 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java
@@ -27,11 +27,10 @@
 //   tags:
 //     - hellobeam
 
+import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
-import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -40,52 +39,49 @@ import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Task {
 
+public class Task {
     private static final Logger LOG = LoggerFactory.getLogger(Task.class);
 
-    public static void main(String[] args) {
-        LOG.info("Running Task");
-        System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
-        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-        options.setTempLocation("gs://bucket");
-        options.as(BigQueryOptions.class).setProject("project-id");
+    private static final String WEATHER_SAMPLES_QUERY =
+            "select * from `clouddataflow-readonly.samples.weather_stations`";
 
-        Pipeline pipeline = Pipeline.create(options);
+    public static void applyBigQueryTornadoes(Pipeline p) {
+        /*TypedRead<TableRow> bigqueryIO =
+                BigQueryIO.readTableRows()
+                        .fromQuery(WEATHER_SAMPLES_QUERY)
+                        .usingStandardSql();
 
-        // pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection.
-        // The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record.
-        // The .fromQuery("SELECT field FROM project-id.dataset.table")
-        // specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively.
-/*
-        PCollection<Double> pCollection = pipeline
-                .apply(BigQueryIO.read(
-                                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("field"))
-                        .fromQuery(
-                                "SELECT field FROM `project-id.dataset.table`")
-                        .usingStandardSql()
-                        .withCoder(DoubleCoder.of()));
-        pCollection
-                .apply("Log words", ParDo.of(new LogOutput<>()));
-*/
 
-        pipeline.run();
+        PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);
+
+        rowsFromBigQuery
+                .apply(ParDo.of(new LogOutput<>("Result: ")));*/
+    }
+
+    public static void runBigQueryTornadoes(PipelineOptions options) {
+        Pipeline p = Pipeline.create(options);
+        applyBigQueryTornadoes(p);
+        p.run().waitUntilFinish();
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options =
+                PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
+        runBigQueryTornadoes(options);
     }
 
     static class LogOutput<T> extends DoFn<T, T> {
         private final String prefix;
 
-        LogOutput() {
-            this.prefix = "Processing element";
-        }
-
         LogOutput(String prefix) {
             this.prefix = prefix;
         }
 
         @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-            LOG.info(prefix + ": {}", c.element());
+        public void processElement(ProcessContext c) {
+            LOG.info(prefix + c.element());
+            c.output(c.element());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
index 6204635c630..fbb1e1e302d 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py
@@ -16,8 +16,10 @@
 
 # beam-playground:
 #   name: read-query
-#   description: TextIO read query example.
+#   description: BigQueryIO read query example.
 #   multifile: false
+#   never_run: true
+#   always_run: true
 #   context_line: 34
 #   categories:
 #     - Quickstart
@@ -26,39 +28,43 @@
 #     - hellobeam
 
 import argparse
+import os
+import warnings
+
 import apache_beam as beam
-from apache_beam.io import ReadFromText
-from apache_beam.io import WriteToBigQuery
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
+from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery
+
+class WeatherData:
+    def __init__(self, station_number, wban_number, year, month, day):
+        self.station_number = station_number
+        self.wban_number = wban_number
+        self.year = year
+        self.month = month
+        self.day = day
 
+    def __str__(self):
+        return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}"
 
 def run(argv=None):
     parser = argparse.ArgumentParser()
-    parser.add_argument('--input',
-                        dest='input',
-                        default='gs://bucket',
-                        help='Input file to process.')
 
     known_args, pipeline_args = parser.parse_known_args(argv)
 
     pipeline_options = PipelineOptions(pipeline_args)
-    pipeline_options.view_as(SetupOptions).save_main_session = True
+    pipeline_options.view_as(PipelineOptions)
 
-    """
-    (p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the 
-    pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery(
-    query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >> 
-    beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the
-    PCollection and logs the output to the console. It prints the 'field' column from each row in the table. 
-    """
 
-    with beam.Pipeline(options=pipeline_options) as p:
-      (p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`')))
-         # Each row is a dictionary where the keys are the BigQuery columns
-         #| beam.Map(lambda elem: elem['field'])
-       )
+    with beam.Pipeline(options=pipeline_options, argv=argv) as p:
+      (p
+         # | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True,
+         #                                                    method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
+         # | beam.combiners.Sample.FixedSizeGlobally(5)
+         # | beam.FlatMap(lambda line: line)
+         # | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day']))
+         # | beam.Map(print)
+         )
 
 
 if __name__ == '__main__':
-  run()
+    run()
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
index 1751beb191e..ef2462f4de3 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go
@@ -28,6 +28,7 @@
 //   complexity: ADVANCED
 //   tags:
 //     - hellobeam
+
 package main
 
 import (
@@ -62,7 +63,7 @@ func main() {
 	s := p.Root()
 	project := "apache-beam-testing"
 
-	// Build a PCollection<CommentRow> by querying BigQuery.
+	// Build a PCollection<Game> by querying BigQuery.
 	rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{}))
 
 	fixedSizeLines := top.Largest(s, rows, 5, less)
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
index 835954382a9..206a0c0b8ee 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java
@@ -35,7 +35,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sample;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
index d57d9a145b8..e89779e5a26 100644
--- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
+++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py
@@ -16,7 +16,7 @@
 
 # beam-playground:
 #   name: read-table
-#   description: TextIO read table example.
+#   description: BigQueryIO read table example.
 #   multifile: false
 #   never_run: true
 #   always_run: true
@@ -28,13 +28,8 @@
 #     - hellobeam
 
 import argparse
-import os
-import warnings
-
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
-#from google.cloud import bigquery
-from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery
 
 class WeatherData:
     def __init__(self, station_number, wban_number, year, month, day):
@@ -57,7 +52,7 @@ def run(argv=None):
 
 
     with beam.Pipeline(options=pipeline_options, argv=argv) as p:
-        (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
+      (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
                                                             method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
          | beam.combiners.Sample.FixedSizeGlobally(5)
          | beam.FlatMap(lambda line: line)