You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Christophe Rodriguez (Jira)" <ji...@apache.org> on 2021/07/08 06:17:00 UTC

[jira] [Created] (BEAM-12586) Python Direct Runner doesn't support both streaming & non streaming sources

Christophe Rodriguez created BEAM-12586:
-------------------------------------------

             Summary: Python Direct Runner doesn't support both streaming & non streaming sources
                 Key: BEAM-12586
                 URL: https://issues.apache.org/jira/browse/BEAM-12586
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
    Affects Versions: 2.30.0
            Reporter: Christophe Rodriguez


Please see Stack Overflow discussion:

[https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir]

When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner.

Code example:
{code:java}
gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
                  | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                   | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))


input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()
{code}
Error:
{code:java}
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
    _GroupByKeyOnly) was not replaced as expected.
{code}
The direct runner corrupts the pipeline when it rewrites the transforms.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)