You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/23 20:10:29 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #12782: [BEAM-10950] Overriding Dataflow Native BQSource.

chamikaramj commented on a change in pull request #12782:
URL: https://github.com/apache/beam/pull/12782#discussion_r493864558



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -269,7 +260,9 @@ def test_biqquery_read_fn_api_fail(self):
           'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'):
         with Pipeline(remote_runner,
                       PipelineOptions(self.default_properties)) as p:
-          _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+          _ = p | beam.io.Read(
+              beam.io.BigQuerySource(

Review comment:
       Is there any value in maintaining these tests that explicitly check whether a non-default behavior fails for Runner v2 ?

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -890,6 +890,8 @@ def get_desired_chunk_size(total_size):
   def expand(self, pbegin):
     if isinstance(self.source, BoundedSource):
       return pbegin | _SDFBoundedSourceWrapper(self.source)
+    elif isinstance(self.source, ptransform.PTransform):
+      return pbegin.pipeline | self.source

Review comment:
       This looks pretty awkward but I get the idea. Please add a comment here explaining this.

##########
File path: sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
##########
@@ -91,24 +91,16 @@ def run_bq_pipeline(argv=None):
         use_standard_sql=known_args.use_standard_sql,
         use_json_exports=known_args.use_json_exports,
         kms_key=kms_key)
-  if known_args.native:
-    _ = data | 'write' >> beam.io.Write(
-        beam.io.BigQuerySink(
-            known_args.output,
-            schema=table_schema,
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-            kms_key=kms_key))
-  else:
-    temp_file_format = (
-        'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
-    _ = data | 'write' >> beam.io.WriteToBigQuery(
-        known_args.output,
-        schema=table_schema,
-        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-        write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-        temp_file_format=temp_file_format,
-        kms_key=kms_key)
+
+  temp_file_format = (

Review comment:
       Does this update the sink as well ? If so please update the CL description accordingly.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_test.py
##########
@@ -168,7 +168,8 @@ def test_invalid_json_neg_inf(self):
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQuerySource(unittest.TestCase):
   def test_display_data_item_on_validate_true(self):
-    source = beam.io.BigQuerySource('dataset.table', validate=True)
+    source = beam.io.BigQuerySource(

Review comment:
       I assume we have the same test coverage for the new source ?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
##########
@@ -451,7 +451,9 @@ def test_read_from_table(self):
     client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
         jobComplete=True, rows=table_rows, schema=schema)
     actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+    with beam.io.BigQuerySource(

Review comment:
       +1. Any reason why we can use the new source here today ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -323,7 +316,9 @@ def test_no_group_by_key_directly_after_bigquery(self):
                          options=PipelineOptions(self.default_properties)) as p:
         # pylint: disable=expression-not-assigned
         p | beam.io.Read(
-            beam.io.BigQuerySource('dataset.faketable')) | beam.GroupByKey()
+            beam.io.BigQuerySource(
+                'dataset.faketable',
+                use_dataflow_native_source=True)) | beam.GroupByKey()

Review comment:
       I assume the state is following.
   (1) We have full test coverage for the new source (which is the default)
   (2) We have additional tests that check old native source which can be just removed whenever we discontinue support for that source.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org