You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/12 14:47:50 UTC
[beam] branch master updated: [BEAM-6619] [BEAM-6593] Add pubsub
integration tests to postcommit
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 59df426 [BEAM-6619] [BEAM-6593] Add pubsub integration tests to postcommit
new 9bca477 Merge pull request #8016 from Juta/it-tests
59df426 is described below
commit 59df4260240ced1ab978e2de015e6bbfe2cbc063
Author: Juta <ju...@hotmail.com>
AuthorDate: Fri Mar 8 11:01:29 2019 +0100
[BEAM-6619] [BEAM-6593] Add pubsub integration tests to postcommit
---
.../apache_beam/io/gcp/pubsub_integration_test.py | 23 +++++++++++-----------
.../apache_beam/io/gcp/pubsub_it_pipeline.py | 4 ++--
sdks/python/apache_beam/io/parquetio_it_test.py | 2 +-
sdks/python/test-suites/dataflow/py3/build.gradle | 3 +++
sdks/python/test-suites/direct/py3/build.gradle | 3 +++
5 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 67237ba..c8a743e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -56,39 +56,39 @@ class PubSubIntegrationTest(unittest.TestCase):
# label_ids, nor writing timestamp attributes. Once these features exist,
# TestDirectRunner and TestDataflowRunner should behave identically.
'TestDirectRunner': [
- PubsubMessage('data001', {}),
+ PubsubMessage(b'data001', {}),
# For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
# IT pipeline writes back the timestamp of each element (as reported
# by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
- PubsubMessage('data002', {
+ PubsubMessage(b'data002', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
}),
],
'TestDataflowRunner': [
# Use ID_LABEL attribute to deduplicate messages with the same ID.
- PubsubMessage('data001', {ID_LABEL: 'foo'}),
- PubsubMessage('data001', {ID_LABEL: 'foo'}),
- PubsubMessage('data001', {ID_LABEL: 'foo'}),
+ PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
+ PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
+ PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
# For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
# IT pipeline writes back the timestamp of each element (as reported
# by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
- PubsubMessage('data002', {
+ PubsubMessage(b'data002', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
})
],
}
EXPECTED_OUTPUT_MESSAGES = {
'TestDirectRunner': [
- PubsubMessage('data001-seen', {'processed': 'IT'}),
- PubsubMessage('data002-seen', {
+ PubsubMessage(b'data001-seen', {'processed': 'IT'}),
+ PubsubMessage(b'data002-seen', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
}),
],
'TestDataflowRunner': [
- PubsubMessage('data001-seen', {'processed': 'IT'}),
- PubsubMessage('data002-seen', {
+ PubsubMessage(b'data001-seen', {'processed': 'IT'}),
+ PubsubMessage(b'data002-seen', {
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
}),
@@ -139,7 +139,8 @@ class PubSubIntegrationTest(unittest.TestCase):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
expected_messages = self.EXPECTED_OUTPUT_MESSAGES[self.runner_name]
if not with_attributes:
- expected_messages = [pubsub_msg.data for pubsub_msg in expected_messages]
+ expected_messages = [pubsub_msg.data.decode('utf-8')
+ for pubsub_msg in expected_messages]
if self.runner_name == 'TestDirectRunner':
strip_attributes = None
else:
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
index f827148..6862bf8 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
@@ -64,14 +64,14 @@ def run_pipeline(argv, with_attributes, id_label, timestamp_attribute):
timestamp_attribute=timestamp_attribute)
def add_attribute(msg, timestamp=beam.DoFn.TimestampParam):
- msg.data += '-seen'
+ msg.data += b'-seen'
msg.attributes['processed'] = 'IT'
if timestamp_attribute in msg.attributes:
msg.attributes[timestamp_attribute + '_out'] = timestamp.to_rfc3339()
return msg
def modify_data(data):
- return data + '-seen'
+ return data + b'-seen'
if with_attributes:
output = messages | 'add_attribute' >> beam.Map(add_attribute)
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py b/sdks/python/apache_beam/io/parquetio_it_test.py
index ba4204c..f66c05f 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -86,7 +86,7 @@ class TestParquetIT(unittest.TestCase):
@staticmethod
def _count_verifier(init_size, data_size, x):
- name, count = x[0], x[1]
+ name, count = x[0].decode('utf-8'), x[1]
counter = Counter(
[string.ascii_uppercase[x%26] for x in range(0, data_size*4, 4)]
)
diff --git a/sdks/python/test-suites/dataflow/py3/build.gradle b/sdks/python/test-suites/dataflow/py3/build.gradle
index b7a2ec0..35a201e 100644
--- a/sdks/python/test-suites/dataflow/py3/build.gradle
+++ b/sdks/python/test-suites/dataflow/py3/build.gradle
@@ -44,6 +44,9 @@ task postCommitIT(dependsOn: ['sdist', 'installGcpTest']) {
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_batch_rewrite_token",
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_kms",
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_rewrite_token",
+ "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_data_only",
+ "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_with_attributes",
+ "apache_beam.io.parquetio_it_test:TestParquetIT.test_parquetio_it",
]
def testOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def cmdArgs = project.mapToArgString([
diff --git a/sdks/python/test-suites/direct/py3/build.gradle b/sdks/python/test-suites/direct/py3/build.gradle
index 4e68446..8720015 100644
--- a/sdks/python/test-suites/direct/py3/build.gradle
+++ b/sdks/python/test-suites/direct/py3/build.gradle
@@ -38,6 +38,9 @@ task postCommitIT(dependsOn: 'installGcpTest') {
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_batch_rewrite_token",
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_kms",
"apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest.test_copy_rewrite_token",
+ "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_data_only",
+ "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest.test_streaming_with_attributes",
+ "apache_beam.io.parquetio_it_test:TestParquetIT.test_parquetio_it",
]
def testOpts = [
"--tests=${batchTests.join(',')}",