You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/05 21:19:59 UTC
[beam] branch master updated: [BEAM-7437] BQ integration test for
streaming inserts in streaming
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e42028b [BEAM-7437] BQ integration test for streaming inserts in streaming
new 2b74933 Merge pull request #8748 from ttanay/it-streaming-inserts
e42028b is described below
commit e42028b8fd81c50af81eb36f2422de649a19bdb8
Author: ttanay <tt...@gmail.com>
AuthorDate: Mon Jun 3 01:04:18 2019 +0530
[BEAM-7437] BQ integration test for streaming inserts in streaming
---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 74 +++++++++++++++++++++++++
sdks/python/build.gradle | 2 +
2 files changed, 76 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 380a4aa..705af67 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -40,7 +40,9 @@ from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
from apache_beam.options import value_provider
+from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
@@ -617,6 +619,78 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
equal_to([(full_output_table_1, bad_record)]))
+ @attr('IT')
+ def test_multiple_destinations_transform_streaming(self):
+ if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+ self.skipTest("TestStream is not supported on TestDataflowRunner")
+ output_table_1 = '%s%s_streaming' % (self.output_table, 1)
+ output_table_2 = '%s%s_streaming' % (self.output_table, 2)
+
+ full_output_table_1 = '%s:%s' % (self.project, output_table_1)
+ full_output_table_2 = '%s:%s' % (self.project, output_table_2)
+
+ schema1 = {'fields': [
+ {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+ schema2 = {'fields': [
+ {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ bad_record = {'language': 1, 'manguage': 2}
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT name, language FROM %s" % output_table_1,
+ data=[(d['name'], d['language'])
+ for d in _ELEMENTS
+ if 'language' in d]),
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT name, foundation FROM %s" % output_table_2,
+ data=[(d['name'], d['foundation'])
+ for d in _ELEMENTS
+ if 'foundation' in d])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink')
+
+ with beam.Pipeline(argv=args) as p:
+ _SIZE = len(_ELEMENTS)
+ test_stream = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements(_ELEMENTS[:_SIZE//2])
+ .advance_watermark_to(100)
+ .add_elements(_ELEMENTS[_SIZE//2:])
+ .advance_watermark_to_infinity())
+ input = p | test_stream
+
+ schema_table_pcv = beam.pvalue.AsDict(
+ p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1),
+ (full_output_table_2, schema2)]))
+
+ table_record_pcv = beam.pvalue.AsDict(
+ p | "MakeTables" >> beam.Create([('table1', full_output_table_1),
+ ('table2', full_output_table_2)]))
+
+ input2 = p | "Broken record" >> beam.Create([bad_record])
+
+ input = (input, input2) | beam.Flatten()
+
+ r = (input
+ | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
+ table=lambda x, tables: (tables['table1']
+ if 'language' in x
+ else tables['table2']),
+ table_side_inputs=(table_record_pcv,),
+ schema=lambda dest, table_map: table_map.get(dest, None),
+ schema_side_inputs=(schema_table_pcv,),
+ method='STREAMING_INSERTS'))
+
+ assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+ equal_to([(full_output_table_1, bad_record)]))
+
def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id,
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 2afddca..db7118f 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -120,6 +120,8 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
def tests = [
"apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
+ "apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
+.test_multiple_destinations_transform_streaming",
]
def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",