You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 23:24:07 UTC
[GitHub] [beam] damccorm opened a new issue, #21403: apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_data_only is flaky
damccorm opened a new issue, #21403:
URL: https://github.com/apache/beam/issues/21403
One of the integration tests is failing in __PostCommits__:
[https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastBuild/testReport/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_data_only/](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastBuild/testReport/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_data_only/)
```
self = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_streaming_data_only>
@pytest.mark.it_postcommit
def test_streaming_data_only(self):
> self._test_streaming(with_attributes=False)
apache_beam/io/gcp/pubsub_integration_test.py:209:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/pubsub_integration_test.py:205:
in _test_streaming
timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
apache_beam/io/gcp/pubsub_it_pipeline.py:93:
in run_pipeline
result = p.run()
apache_beam/pipeline.py:573: in run
return self.runner.run_pipeline(self,
self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self
= <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7face48c99e8>
pipeline
= <apache_beam.pipeline.Pipeline object at 0x7face48c9278>
options = <apache_beam.options.pipeline_options.PipelineOptions
object at 0x7face4891588>
def run_pipeline(self, pipeline, options):
"""Execute test pipeline
and verify test matcher"""
test_options = options.view_as(TestOptions)
on_success_matcher
= test_options.on_success_matcher
wait_duration = test_options.wait_until_finish_duration
is_streaming = options.view_as(StandardOptions).streaming
# [BEAM-1889] Do not send
this to remote workers also, there is no need to
# send this option to remote executors.
test_options.on_success_matcher = None
self.result = super().run_pipeline(pipeline,
options)
if self.result.has_job:
# TODO(markflyhigh)(BEAM-1890): Use print since Nose
dosen't show logs
# in some cases.
print('Worker logs: %s' % self.build_console_url(options))
try:
self.wait_until_in_state(PipelineState.RUNNING)
if is_streaming
and not wait_duration:
_LOGGER.warning('Waiting indefinitely for streaming job.')
self.result.wait_until_finish(duration=wait_duration)
if on_success_matcher:
from hamcrest import assert_that as hc_assert_that
> hc_assert_that(self.result, pickler.loads(on_success_matcher))
E
AssertionError:
E Expected: (Test pipeline expected terminated in state: RUNNING and
Expected 4 messages.)
E but: Expected 4 messages. Got 0 messages. Diffs (item, count):
E
Expected but not in actual: dict_items([(b'data001-seen', 1), (b'data002-seen', 1), (b'data003\xab\xac-seen',
1), (b'data004\xab\xac-seen', 1)])
E Unexpected: dict_items([])
apache_beam/runners/dataflow/test_dataflow_runner.py:68:
AssertionError
```
Imported from Jira [BEAM-13480](https://issues.apache.org/jira/browse/BEAM-13480). Original Jira may contain additional context.
Reported by: yeandy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org