You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/06/07 05:17:43 UTC
[1/2] beam git commit: [BEAM-2405] Override to sink interface in the
batch dataflow BQ
Repository: beam
Updated Branches:
refs/heads/master b6347d02c -> 3cc4ff6d7
[BEAM-2405] Override to sink interface in the batch dataflow BQ
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e641997a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e641997a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e641997a
Branch: refs/heads/master
Commit: e641997affc378ec0337d5ac19d8677cba0d0933
Parents: b6347d0
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jun 6 19:49:54 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 22:17:05 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/bigquery_tornadoes.py | 11 +++++------
sdks/python/apache_beam/io/gcp/bigquery.py | 2 +-
.../runners/dataflow/dataflow_runner.py | 18 ++++++++++++++++++
3 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index d3b216e..1ca49c5 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -83,12 +83,11 @@ def run(argv=None):
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- counts | 'write' >> beam.io.Write(
- beam.io.BigQuerySink(
- known_args.output,
- schema='month:INTEGER, tornado_count:INTEGER',
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+ counts | 'Write' >> beam.io.WriteToBigQuery(
+ known_args.output,
+ schema='month:INTEGER, tornado_count:INTEGER',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
# Run the pipeline (all operations are deferred until run() is called).
http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9069f73..da8be68 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1299,7 +1299,7 @@ class WriteToBigQuery(PTransform):
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
client=self.test_client)
- return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn)
+ return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
def display_data(self):
res = {}
http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 62cea33..3fc8983 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -27,6 +27,7 @@ import time
import traceback
import urllib
+import apache_beam as beam
from apache_beam import error
from apache_beam import coders
from apache_beam import pvalue
@@ -378,6 +379,23 @@ class DataflowRunner(PipelineRunner):
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+ def apply_WriteToBigQuery(self, transform, pcoll):
+ standard_options = pcoll.pipeline._options.view_as(StandardOptions)
+ if standard_options.streaming:
+ if (transform.write_disposition ==
+ beam.io.BigQueryDisposition.WRITE_TRUNCATE):
+ raise RuntimeError('Can not use write truncation mode in streaming')
+ return self.apply_PTransform(transform, pcoll)
+ else:
+ return pcoll | 'WriteToBigQuery' >> beam.io.Write(
+ beam.io.BigQuerySink(
+ transform.table_reference.tableId,
+ transform.table_reference.datasetId,
+ transform.table_reference.projectId,
+ transform.schema,
+ transform.create_disposition,
+ transform.write_disposition))
+
def apply_GroupByKey(self, transform, pcoll):
# Infer coder of parent.
#
[2/2] beam git commit: This closes #3306
Posted by ch...@apache.org.
This closes #3306
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3cc4ff6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3cc4ff6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3cc4ff6d
Branch: refs/heads/master
Commit: 3cc4ff6d72cf74e91b0a0d9cdd4277288958c242
Parents: b6347d0 e641997
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Jun 6 22:17:18 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 22:17:18 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/bigquery_tornadoes.py | 11 +++++------
sdks/python/apache_beam/io/gcp/bigquery.py | 2 +-
.../runners/dataflow/dataflow_runner.py | 18 ++++++++++++++++++
3 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------