You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ahmet Altay <al...@google.com> on 2019/02/01 01:16:44 UTC
Re: Beam Python streaming pipeline on Flink Runner
+1 to Thomas's idea as a way to enable python users on Flink. On the other
hand his will be a throwaway work once SDF is supported. How far are we
from SDF support?
On Thu, Jan 31, 2019 at 9:18 AM Maximilian Michels <mx...@apache.org> wrote:
> Ah, I thought you meant native Flink transforms.
>
> Exactly! The translation code is already there. The main challenge is how
> to
> programmatically configure the BeamIO from Python. I suppose that is also
> an
> unsolved problem for cross-language transforms in general.
>
> For Matthias' pipeline with PubSubIO we can build something specific, but
> for
> the general case there should be way to initialize a Beam IO via a
> configuration
> map provided by an external environment.
>
> On 31.01.19 17:36, Thomas Weise wrote:
> > Exactly, that's what I had in mind.
> >
> > A Flink runner native transform would make the existing unbounded
> sources
> > available, similar to:
> >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> >
> >
> >
> >
> > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Wouldn't it be even more useful for the transition period if we
> enabled Beam IO
> > to be used via Flink (like in the legacy Flink Runner)? In this
> particular
> > example, Matthias wants to use PubSubIO, which is not even available
> as a
> > native
> > Flink transform.
> >
> > On 31.01.19 16:21, Thomas Weise wrote:
> > > Until SDF is supported, we could also add Flink runner native
> transforms for
> > > selected unbounded sources [1].
> > >
> > > That might be a reasonable option to unblock users that want to
> try Python
> > > streaming on Flink.
> > >
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> > >
> > >
> > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels <
> mxm@apache.org
> > <ma...@apache.org>
> > > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >
> > > > I have a hard time to imagine how can we map in a generic
> way
> > > RestrictionTrackers into the existing
> Bounded/UnboundedSource, so I would
> > > love to hear more about the details.
> > >
> > > Isn't it the other way around? The SDF is a generalization of
> > UnboundedSource.
> > > So we would wrap UnboundedSource using SDF. I'm not saying it
> is
> > trivial, but
> > > SDF offers all the functionality that UnboundedSource needs.
> > >
> > > For example, the @GetInitialRestriction method would call
> split on the
> > > UnboundedSource and the restriction trackers would then be
> used to
> > process the
> > > splits.
> > >
> > > On 31.01.19 15:16, Ismaël Mejía wrote:
> > > >> Not necessarily. This would be one way. Another way is
> build an SDF
> > > wrapper for UnboundedSource. Probably the easier path for
> migration.
> > > >
> > > > That would be fantastic, I have heard about such wrapper
> multiple
> > > > times but so far there is not any realistic proposal. I
> have a hard
> > > > time to imagine how can we map in a generic way
> RestrictionTrackers
> > > > into the existing Bounded/UnboundedSource, so I would love
> to hear
> > > > more about the details.
> > > >
> > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels <
> mxm@apache.org
> > <ma...@apache.org>
> > > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > > >>
> > > >> > In addition to have support in the runners, this will
> require a
> > > >> > rewrite of PubsubIO to use the new SDF API.
> > > >>
> > > >> Not necessarily. This would be one way. Another way is
> build an SDF
> > > wrapper for
> > > >> UnboundedSource. Probably the easier path for migration.
> > > >>
> > > >> On 31.01.19 14:03, Ismaël Mejía wrote:
> > > >>>> Fortunately, there is already a pending PR for
> cross-language
> > > pipelines which
> > > >>>> will allow us to use Java IO like PubSub in Python jobs.
> > > >>>
> > > >>> In addition to have support in the runners, this will
> require a
> > > >>> rewrite of PubsubIO to use the new SDF API.
> > > >>>
> > > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org>
> > > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > > >>>>
> > > >>>> Hi Matthias,
> > > >>>>
> > > >>>> This is already reflected in the compatibility matrix,
> if you look
> > > under SDF.
> > > >>>> There is no UnboundedSource interface for portable
> pipelines.
> > That's a
> > > legacy
> > > >>>> abstraction that will be replaced with SDF.
> > > >>>>
> > > >>>> Fortunately, there is already a pending PR for
> cross-language
> > > pipelines which
> > > >>>> will allow us to use Java IO like PubSub in Python jobs.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Max
> > > >>>>
> > > >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> > > >>>>> Hey Ankur,
> > > >>>>>
> > > >>>>> Thanks for the swift reply. Should I change this in the
> > capability matrix
> > > >>>>> <
> https://s.apache.org/apache-beam-portability-support-table> then?
> > > >>>>>
> > > >>>>> Many thanks.
> > > >>>>> Best,
> > > >>>>> Matthias
> > > >>>>>
> > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka <
> goenka@google.com
> > <ma...@google.com>
> > > <mailto:goenka@google.com <ma...@google.com>>
> > > >>>>> <mailto:goenka@google.com <ma...@google.com>
> > <mailto:goenka@google.com <ma...@google.com>>>> wrote:
> > > >>>>>
> > > >>>>> Hi Matthias,
> > > >>>>>
> > > >>>>> Unfortunately, unbounded reads including pubsub
> are not yet
> > > supported for
> > > >>>>> portable runners.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Ankur
> > > >>>>>
> > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens
> > > <baetensmatthias@gmail.com <ma...@gmail.com>
> > <mailto:baetensmatthias@gmail.com <mailto:baetensmatthias@gmail.com
> >>
> > > >>>>> <mailto:baetensmatthias@gmail.com
> > <ma...@gmail.com>
> > > <mailto:baetensmatthias@gmail.com
> > <ma...@gmail.com>>>> wrote:
> > > >>>>>
> > > >>>>> Hi everyone,
> > > >>>>>
> > > >>>>> Last few days I have been trying to run a
> streaming
> > > pipeline (code on
> > > >>>>> Github <
> https://github.com/matthiasa4/beam-demo>) on a
> > > Flink Runner.
> > > >>>>>
> > > >>>>> I am running a Flink cluster locally (v1.5.6
> > > >>>>> <https://flink.apache.org/downloads.html>)
> > > >>>>> I have built the SDK Harness Container:
> /./gradlew
> > > >>>>> :beam-sdks-python-container:docker/
> > > >>>>> and started the JobServer: /./gradlew
> > > >>>>> :beam-runners-flink_2.11-job-server:runShadow
> > > >>>>> -PflinkMasterUrl=localhost:8081./
> > > >>>>>
> > > >>>>> I run my pipeline with: /env/bin/python
> > streaming_pipeline.py
> > > >>>>> --runner=PortableRunner
> --job_endpoint=localhost:8099
> > > --output xxx
> > > >>>>> --input_subscription xxx
> --output_subscription xxx/
> > > >>>>> /
> > > >>>>> /
> > > >>>>> All this is running inside a Ubuntu (Bionic)
> in a
> > Virtualbox.
> > > >>>>>
> > > >>>>> The job submits fine, but unfortunately
> fails after
> > a few
> > > seconds with
> > > >>>>> the error attached.
> > > >>>>>
> > > >>>>> Anything I am missing or doing wrong?
> > > >>>>>
> > > >>>>> Many thanks.
> > > >>>>> Best,
> > > >>>>> Matthias
> > > >>>>>
> > > >>>>>
> > >
> >
>