You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/06/07 05:17:43 UTC

[1/2] beam git commit: [BEAM-2405] Override to sink interface in the batch dataflow BQ

Repository: beam
Updated Branches:
  refs/heads/master b6347d02c -> 3cc4ff6d7


[BEAM-2405] Override to sink interface in the batch dataflow BQ


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e641997a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e641997a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e641997a

Branch: refs/heads/master
Commit: e641997affc378ec0337d5ac19d8677cba0d0933
Parents: b6347d0
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jun 6 19:49:54 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 22:17:05 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_tornadoes.py       | 11 +++++------
 sdks/python/apache_beam/io/gcp/bigquery.py        |  2 +-
 .../runners/dataflow/dataflow_runner.py           | 18 ++++++++++++++++++
 3 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index d3b216e..1ca49c5 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -83,12 +83,11 @@ def run(argv=None):
 
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned
-    counts | 'write' >> beam.io.Write(
-        beam.io.BigQuerySink(
-            known_args.output,
-            schema='month:INTEGER, tornado_count:INTEGER',
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+    counts | 'Write' >> beam.io.WriteToBigQuery(
+        known_args.output,
+        schema='month:INTEGER, tornado_count:INTEGER',
+        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
 
     # Run the pipeline (all operations are deferred until run() is called).
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9069f73..da8be68 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1299,7 +1299,7 @@ class WriteToBigQuery(PTransform):
         create_disposition=self.create_disposition,
         write_disposition=self.write_disposition,
         client=self.test_client)
-    return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn)
+    return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
 
   def display_data(self):
     res = {}

http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 62cea33..3fc8983 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -27,6 +27,7 @@ import time
 import traceback
 import urllib
 
+import apache_beam as beam
 from apache_beam import error
 from apache_beam import coders
 from apache_beam import pvalue
@@ -378,6 +379,23 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
+  def apply_WriteToBigQuery(self, transform, pcoll):
+    standard_options = pcoll.pipeline._options.view_as(StandardOptions)
+    if standard_options.streaming:
+      if (transform.write_disposition ==
+          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
+        raise RuntimeError('Can not use write truncation mode in streaming')
+      return self.apply_PTransform(transform, pcoll)
+    else:
+      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
+          beam.io.BigQuerySink(
+              transform.table_reference.tableId,
+              transform.table_reference.datasetId,
+              transform.table_reference.projectId,
+              transform.schema,
+              transform.create_disposition,
+              transform.write_disposition))
+
   def apply_GroupByKey(self, transform, pcoll):
     # Infer coder of parent.
     #


[2/2] beam git commit: This closes #3306

Posted by ch...@apache.org.
This closes #3306


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3cc4ff6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3cc4ff6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3cc4ff6d

Branch: refs/heads/master
Commit: 3cc4ff6d72cf74e91b0a0d9cdd4277288958c242
Parents: b6347d0 e641997
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Jun 6 22:17:18 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 22:17:18 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_tornadoes.py       | 11 +++++------
 sdks/python/apache_beam/io/gcp/bigquery.py        |  2 +-
 .../runners/dataflow/dataflow_runner.py           | 18 ++++++++++++++++++
 3 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------