You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ananda Prasad Inguva (Jira)" <ji...@apache.org> on 2021/11/10 17:07:00 UTC
[jira] [Updated] (BEAM-13218)
apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_with_attributes
is failing
[ https://issues.apache.org/jira/browse/BEAM-13218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ananda Prasad Inguva updated BEAM-13218:
----------------------------------------
Summary: apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_with_attributes is failing (was: PubSubIntegrationTest is failing)
> apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_with_attributes is failing
> ----------------------------------------------------------------------------------------------------------
>
> Key: BEAM-13218
> URL: https://issues.apache.org/jira/browse/BEAM-13218
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Ananda Prasad Inguva
> Priority: P1
> Labels: currently-failing, flaky
>
> One of the integration tests is failing:
> [https://ci-beam.apache.org/job/beam_PostCommit_Python37/4496/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]
>
> https://ci-beam.apache.org/job/beam_PostCommit_Python38/1893/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/
> {code:java}
> Error MessageAssertionError: Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.) but: Expected 4 messages. Got 4 messages. Diffs (item, count): Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)]) Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)]) Stripped attributes: ['id', 'timestamp']Stacktraceself = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_streaming_with_attributes>
> @pytest.mark.it_postcommit
> def test_streaming_with_attributes(self):
> > self._test_streaming(with_attributes=True)
> apache_beam/io/gcp/pubsub_integration_test.py:213:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming
> timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
> apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline
> result = p.run()
> apache_beam/pipeline.py:573: in run
> return self.runner.run_pipeline(self, self._options)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7fa1d1561950>
> pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750>
> options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fa1d15614d0>
> def run_pipeline(self, pipeline, options):
> """Execute test pipeline and verify test matcher"""
> test_options = options.view_as(TestOptions)
> on_success_matcher = test_options.on_success_matcher
> wait_duration = test_options.wait_until_finish_duration
> is_streaming = options.view_as(StandardOptions).streaming
>
> # [BEAM-1889] Do not send this to remote workers also, there is no need to
> # send this option to remote executors.
> test_options.on_success_matcher = None
>
> self.result = super().run_pipeline(pipeline, options)
> if self.result.has_job:
> # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
> # in some cases.
> print('Worker logs: %s' % self.build_console_url(options))
>
> try:
> self.wait_until_in_state(PipelineState.RUNNING)
>
> if is_streaming and not wait_duration:
> _LOGGER.warning('Waiting indefinitely for streaming job.')
> self.result.wait_until_finish(duration=wait_duration)
>
> if on_success_matcher:
> from hamcrest import assert_that as hc_assert_that
> > hc_assert_that(self.result, pickler.loads(on_success_matcher))
> E AssertionError:
> E Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.)
> E but: Expected 4 messages. Got 4 messages. Diffs (item, count):
> E Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])
> E Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)])
> E Stripped attributes: ['id', 'timestamp']
> apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)