You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/23 20:25:41 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

chamikaramj commented on a change in pull request #12203:
URL: https://github.com/apache/beam/pull/12203#discussion_r459703269



##########
File path: CHANGES.md
##########
@@ -55,6 +55,9 @@
 
 * New overloads for BigtableIO.Read.withKeyRange() and BigtableIO.Read.withRowFilter()
   methods that take ValueProvider as a parameter (Java) ([BEAM-10283](https://issues.apache.org/jira/browse/BEAM-10283)).
+* The WriteToBigQuery transform (Python) in Dataflow Batch no longer relies on BigQuerySource by default. It relies on 

Review comment:
       Did you mean BigQuerySink ?

##########
File path: sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
##########
@@ -227,6 +225,7 @@ def test_big_query_standard_sql_kms_key_native(self):
         'on_success_matcher': all_of(*pipeline_verifiers),
         'kms_key': kms_key,
         'native': True,
+        'experiments': 'use_dataflow_bq_sink',

Review comment:
       Can we just change this sink to use the new sink ? 

##########
File path: CHANGES.md
##########
@@ -55,6 +55,9 @@
 
 * New overloads for BigtableIO.Read.withKeyRange() and BigtableIO.Read.withRowFilter()
   methods that take ValueProvider as a parameter (Java) ([BEAM-10283](https://issues.apache.org/jira/browse/BEAM-10283)).
+* The WriteToBigQuery transform (Python) in Dataflow Batch no longer relies on BigQuerySource by default. It relies on 
+  a new, fully-featured transform based on file loads into BigQuery. To revert the behavior to the old implementation,
+  you may use `--experiments=use_dataflow_bq_sink`.

Review comment:
       May be call this "use_legacy_bq_sink" ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -698,6 +698,7 @@ def test_gbk_translation(self):
   def test_write_bigquery_translation(self):
     runner = DataflowRunner()
 
+    self.default_properties.append('--experiments=use_dataflow_bq_sink')

Review comment:
       Why not run this using the new sink ?

##########
File path: sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
##########
@@ -236,7 +236,10 @@ def enter_composite_transform(self, transform_node):
         self.visit_transform(transform_node)
 
       def visit_transform(self, transform_node):
-        if [o for o in self.outputs if o in transform_node.inputs]:
+        # Internal consumers of the outputs we're overriding are expected.

Review comment:
       Why did we have to add this explicit fail ? I don't think we add such visitors for other transforms that do not produce output. Usually succeeding transform just ends up being an no-op.

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -749,12 +750,13 @@ def test_write_bigquery_failed_translation(self):
     """Tests that WriteToBigQuery cannot have any consumers if replaced."""
     runner = DataflowRunner()
 
-    with self.assertRaises(ValueError):
+    self.default_properties.append('--experiments=use_dataflow_bq_sink')

Review comment:
       Ditto.

##########
File path: sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
##########
@@ -236,7 +236,10 @@ def enter_composite_transform(self, transform_node):
         self.visit_transform(transform_node)
 
       def visit_transform(self, transform_node):
-        if [o for o in self.outputs if o in transform_node.inputs]:
+        # Internal consumers of the outputs we're overriding are expected.
+        # We only error out on non-internal consumers.
+        if ('BigQueryBatchFileLoads' not in transform_node.full_label and

Review comment:
       Can we use the experiment here ? You can get hold of the PipelineOptions through input.

##########
File path: sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
##########
@@ -305,7 +303,8 @@ def test_big_query_new_types_native(self):
         'use_standard_sql': False,
         'native': True,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
-        'on_success_matcher': all_of(*pipeline_verifiers)
+        'on_success_matcher': all_of(*pipeline_verifiers),
+        'experiments': 'use_dataflow_bq_sink',

Review comment:
       Ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org