You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/05 21:19:59 UTC

[beam] branch master updated: [BEAM-7437] BQ integration test for streaming inserts in streaming

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e42028b  [BEAM-7437] BQ integration test for streaming inserts in streaming
     new 2b74933  Merge pull request #8748 from ttanay/it-streaming-inserts
e42028b is described below

commit e42028b8fd81c50af81eb36f2422de649a19bdb8
Author: ttanay <tt...@gmail.com>
AuthorDate: Mon Jun 3 01:04:18 2019 +0530

    [BEAM-7437] BQ integration test for streaming inserts in streaming
---
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 74 +++++++++++++++++++++++++
 sdks/python/build.gradle                        |  2 +
 2 files changed, 76 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 380a4aa..705af67 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -40,7 +40,9 @@ from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
 from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
 from apache_beam.options import value_provider
+from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
@@ -617,6 +619,78 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
       assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
                   equal_to([(full_output_table_1, bad_record)]))
 
+  @attr('IT')
+  def test_multiple_destinations_transform_streaming(self):
+    if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+      self.skipTest("TestStream is not supported on TestDataflowRunner")
+    output_table_1 = '%s%s_streaming' % (self.output_table, 1)
+    output_table_2 = '%s%s_streaming' % (self.output_table, 2)
+
+    full_output_table_1 = '%s:%s' % (self.project, output_table_1)
+    full_output_table_2 = '%s:%s' % (self.project, output_table_2)
+
+    schema1 = {'fields': [
+        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+    schema2 = {'fields': [
+        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+    bad_record = {'language': 1, 'manguage': 2}
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT name, language FROM %s" % output_table_1,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT name, foundation FROM %s" % output_table_2,
+            data=[(d['name'], d['foundation'])
+                  for d in _ELEMENTS
+                  if 'foundation' in d])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink')
+
+    with beam.Pipeline(argv=args) as p:
+      _SIZE = len(_ELEMENTS)
+      test_stream = (TestStream()
+                     .advance_watermark_to(0)
+                     .add_elements(_ELEMENTS[:_SIZE//2])
+                     .advance_watermark_to(100)
+                     .add_elements(_ELEMENTS[_SIZE//2:])
+                     .advance_watermark_to_infinity())
+      input = p | test_stream
+
+      schema_table_pcv = beam.pvalue.AsDict(
+          p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1),
+                                            (full_output_table_2, schema2)]))
+
+      table_record_pcv = beam.pvalue.AsDict(
+          p | "MakeTables" >> beam.Create([('table1', full_output_table_1),
+                                           ('table2', full_output_table_2)]))
+
+      input2 = p | "Broken record" >> beam.Create([bad_record])
+
+      input = (input, input2) | beam.Flatten()
+
+      r = (input
+           | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
+               table=lambda x, tables: (tables['table1']
+                                        if 'language' in x
+                                        else tables['table2']),
+               table_side_inputs=(table_record_pcv,),
+               schema=lambda dest, table_map: table_map.get(dest, None),
+               schema_side_inputs=(schema_table_pcv,),
+               method='STREAMING_INSERTS'))
+
+      assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+                  equal_to([(full_output_table_1, bad_record)]))
+
   def tearDown(self):
     request = bigquery.BigqueryDatasetsDeleteRequest(
         projectId=self.project, datasetId=self.dataset_id,
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 2afddca..db7118f 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -120,6 +120,8 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
     def tests = [
         "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
         "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
+        "apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
+.test_multiple_destinations_transform_streaming",
     ]
     def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
     def argMap = ["runner": "TestDirectRunner",