You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/12 14:47:50 UTC

[beam] branch master updated: [BEAM-6619] [BEAM-6593] Add pubsub integration tests to postcommit

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 59df426  [BEAM-6619] [BEAM-6593] Add pubsub integration tests to postcommit
     new 9bca477  Merge pull request #8016 from Juta/it-tests
59df426 is described below

commit 59df4260240ced1ab978e2de015e6bbfe2cbc063
Author: Juta <ju...@hotmail.com>
AuthorDate: Fri Mar 8 11:01:29 2019 +0100

    [BEAM-6619] [BEAM-6593] Add pubsub integration tests to postcommit
---
 .../apache_beam/io/gcp/pubsub_integration_test.py  | 23 +++++++++++-----------
 .../apache_beam/io/gcp/pubsub_it_pipeline.py       |  4 ++--
 sdks/python/apache_beam/io/parquetio_it_test.py    |  2 +-
 sdks/python/test-suites/dataflow/py3/build.gradle  |  3 +++
 sdks/python/test-suites/direct/py3/build.gradle    |  3 +++
 5 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 67237ba..c8a743e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -56,39 +56,39 @@ class PubSubIntegrationTest(unittest.TestCase):
       # label_ids, nor writing timestamp attributes. Once these features exist,
       # TestDirectRunner and TestDataflowRunner should behave identically.
       'TestDirectRunner': [
-          PubsubMessage('data001', {}),
+          PubsubMessage(b'data001', {}),
           # For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
           # IT pipeline writes back the timestamp of each element (as reported
           # by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
-          PubsubMessage('data002', {
+          PubsubMessage(b'data002', {
               TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
           }),
       ],
       'TestDataflowRunner': [
           # Use ID_LABEL attribute to deduplicate messages with the same ID.
-          PubsubMessage('data001', {ID_LABEL: 'foo'}),
-          PubsubMessage('data001', {ID_LABEL: 'foo'}),
-          PubsubMessage('data001', {ID_LABEL: 'foo'}),
+          PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
+          PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
+          PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
           # For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
           # IT pipeline writes back the timestamp of each element (as reported
           # by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
-          PubsubMessage('data002', {
+          PubsubMessage(b'data002', {
               TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
           })
       ],
   }
   EXPECTED_OUTPUT_MESSAGES = {
       'TestDirectRunner': [
-          PubsubMessage('data001-seen', {'processed': 'IT'}),
-          PubsubMessage('data002-seen', {
+          PubsubMessage(b'data001-seen', {'processed': 'IT'}),
+          PubsubMessage(b'data002-seen', {
               TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
               TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
               'processed': 'IT',
           }),
       ],
       'TestDataflowRunner': [
-          PubsubMessage('data001-seen', {'processed': 'IT'}),
-          PubsubMessage('data002-seen', {
+          PubsubMessage(b'data001-seen', {'processed': 'IT'}),
+          PubsubMessage(b'data002-seen', {
               TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
               'processed': 'IT',
           }),
@@ -139,7 +139,8 @@ class PubSubIntegrationTest(unittest.TestCase):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
     expected_messages = self.EXPECTED_OUTPUT_MESSAGES[self.runner_name]
     if not with_attributes:
-      expected_messages = [pubsub_msg.data for pubsub_msg in expected_messages]
+      expected_messages = [pubsub_msg.data.decode('utf-8')
+                           for pubsub_msg in expected_messages]
     if self.runner_name == 'TestDirectRunner':
       strip_attributes = None
     else:
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
index f827148..6862bf8 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
@@ -64,14 +64,14 @@ def run_pipeline(argv, with_attributes, id_label, timestamp_attribute):
         timestamp_attribute=timestamp_attribute)
 
   def add_attribute(msg, timestamp=beam.DoFn.TimestampParam):
-    msg.data += '-seen'
+    msg.data += b'-seen'
     msg.attributes['processed'] = 'IT'
     if timestamp_attribute in msg.attributes:
       msg.attributes[timestamp_attribute + '_out'] = timestamp.to_rfc3339()
     return msg
 
   def modify_data(data):
-    return data + '-seen'
+    return data + b'-seen'
 
   if with_attributes:
     output = messages | 'add_attribute' >> beam.Map(add_attribute)
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py b/sdks/python/apache_beam/io/parquetio_it_test.py
index ba4204c..f66c05f 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -86,7 +86,7 @@ class TestParquetIT(unittest.TestCase):
 
   @staticmethod
   def _count_verifier(init_size, data_size, x):
-    name, count = x[0], x[1]
+    name, count = x[0].decode('utf-8'), x[1]
     counter = Counter(
         [string.ascii_uppercase[x%26] for x in range(0, data_size*4, 4)]
     )
diff --git a/sdks/python/test-suites/dataflow/py3/build.gradle b/sdks/python/test-suites/dataflow/py3/build.gradle
index b7a2ec0..35a201e 100644
--- a/sdks/python/test-suites/dataflow/py3/build.gradle
+++ b/sdks/python/test-suites/dataflow/py3/build.gradle
@@ -44,6 +44,9 @@ task postCommitIT(dependsOn: ['sdist', 'installGcpTest']) {
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_batch_rewrite_token",
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_kms",
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_rewrite_token",
+        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_data_only",
+        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_with_attributes",
+        "apache_beam.io.parquetio_it_test:TestParquetIT.test_parquetio_it",
     ]
     def testOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
     def cmdArgs = project.mapToArgString([
diff --git a/sdks/python/test-suites/direct/py3/build.gradle b/sdks/python/test-suites/direct/py3/build.gradle
index 4e68446..8720015 100644
--- a/sdks/python/test-suites/direct/py3/build.gradle
+++ b/sdks/python/test-suites/direct/py3/build.gradle
@@ -38,6 +38,9 @@ task postCommitIT(dependsOn: 'installGcpTest') {
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_batch_rewrite_token",
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_kms",
         "apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_rewrite_token",
+        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_data_only",
+        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_with_attributes",
+        "apache_beam.io.parquetio_it_test:TestParquetIT.test_parquetio_it",
     ]
     def testOpts = [
         "--tests=${batchTests.join(',')}",