You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pascal Gula <pa...@plantix.net> on 2019/01/04 15:11:46 UTC

Re: Reading from custom Unbounded Sources with the Python SDK

@david, did you try out and refine the example that @robert mentioned in
his last email?

I have a similar use-case and wanted to get some feedback from you before
trying it out myself.

On Tue, 13 Nov 2018 at 16:15, Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Nov 13, 2018 at 3:44 PM David Gasquez <da...@gmail.com>
> wrote:
> >
> > Thanks so much for the help here Robert. If I understood it correctly,
> to work with an unbounded source in Python right now the best approach is
> to fake it using an initial create, state and timers.
>
> Yep, exactly.
>
> > If that's the case, do you have any code samples I can learn from? Not
> sure how that looks to be honest.
>
> I can't think of any examples offhand, but it's look something like
>
>
> class OplogSourceDoFn(beam.DoFn):
>
>     resume_token_spec = beam.DoFn.BagStateSpec('resume',
> beam.coders.BytesCoder())
>     timer_spec = beam.DoFn.TimerSpec('continue', TimeDomain.EVENT_TIME)
>
>     def __init__(self, uri, database, collection, batch_size=100):
>         super(OplogSourceDoFn, self).__init__()
>         self.uri = uri
>         self.database = database
>         self.collection = collection
>         self.batch_size = batch_size
>
>     def client(self):
>         self._client = pymongo.MongoClient(self.uri,
> readPreference="secondaryPreferred")
>         return self._client
>
>     def process(self, element, timer=DoFn.TimerParam(timer_spec),
> now=DoFn.TimestampParam):
>         # Set a timer to get things going.  This will fire soon.
>         timer.set(beam.Timestamp.of(time.time()))
>
>     @on_timer(timer_spec)
>     def resume(self, timer=DoFn.TimerParam(timer_spec),
> resume_tokens=DoFn.StateParam(resume_token_spec)):
>
>         # Get the latest mongodb resume token, if any.
>         last_token = any(resume_tokens.read()) or None
>
>         # Open the stream.
>         client = self.client()
>         self.db = client.get_database(self.database)
>         self.col = self.db.get_collection(self.collection)
>         self.cursor =
> self.col.watch(full_document="updateLookup", resume_after=last_token)
>
>         # Read at most batch_size elements from the stream.
>         change = None
>         with self.cursor as stream:
>             for _, change in zip(range(self.batch_size, stream)):
>                 yield change
>
>         # If anything was read, set the last one as our resume token.
>         if change:
>           resume_tokens.clear()
>           resume_tokens.add(change.get('_id'))
>
>         # Schedule this method to run again.
>         timer.set(beam.Timestamp.of(time.time()))
>
>
> Warning, this is email-window authored, completely untested code, so YYMV.
> But hopefully it serves as an example of the core concepts.
>
> > Finally, I'm thinking it might be simpler for me to use the provided
> PubSub streaming source. That'd mean using an external service to place the
> events in PubSub. It should also take care of checkpointing and handling
> errors!
>
> If you have somewhere you can run a process that reads from mongo and
> publishes to pubsub, that'd do too.
>
> > On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Just an addendum, you should be able to fake this in the meantime by
> >> starting with an initial create and using state and timers. One
> >> problem with the source as written above is that it will never
> >> checkpoint, meaning if your pipeline crashes it will start again from
> >> the beginning (including all the downstream processing). You could
> >> possibly get a resume token from your cursor, store that to state, and
> >> exit the DoFn. In your timer callback, you would resume reading for a
> >> while and then set another timer, just as before. See
> >> https://s.apache.org/beam-python-user-state-and-timers and related
> >> docs for all the details.
> >>
> >> Don't hesitate to respond to the thread if anything isn't clear or you
> >> have additional questions (or success stories!).
> >>
> >> - Robert
> >>
> >> On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >
> >> > The future of Beam sources is SDF, see
> >> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >> >
> >> > This is actively being worked on, but we're still in the present. For
> >> > bounded sources, you still may want to use the Source API (which, in
> >> > Python, is much closer to what SDF is settling down to be, so it
> >> > should be an easy port once that time comes). Unfortunately, Python
> >> > Streaming doesn't yet support anything but PubSub.
> >> > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez <
> davidgasquez@gmail.com> wrote:
> >> > >
> >> > > Hey there,
> >> > >
> >> > > I've been exploring Apache Beam lately and I'm now working on my
> first production pipeline. The goal of this pipeline is to replicate a
> MongoDB Collection into Big Query. To do that I want to read MongoDB Oplog
> and use these events to update the table in Big Query (happy to expand more
> on this if needed).
> >> > >
> >> > > MongoDB Oplog is an unbounded source. I was wondering what are the
> best practices dealing with this kind of sources in Python. Currently, I'm
> using a custom beam.DoFn to read the Oplog inside a streaming pipeline.
> That said, I'm not sure how this will behave and how can be improved (the
> pipeline relies on a beam.Create([0]) first step that seems hacky to me).
> >> > >
> >> > > This are the key snippets of the code:
> >> > >
> >> > > ```
> >> > > class OplogSourceDoFn(beam.DoFn):
> >> > >     def __init__(self, uri, database, collection):
> >> > >         super(OplogSourceDoFn, self).__init__()
> >> > >         self.uri = uri
> >> > >         self.database = database
> >> > >         self.collection = collection
> >> > >
> >> > >     def client(self):
> >> > >         self._client = pymongo.MongoClient(self.uri,
> readPreference="secondaryPreferred")
> >> > >         return self._client
> >> > >
> >> > >     def process(self, element):
> >> > >         client = self.client()
> >> > >         self.db = client.get_database(self.database)
> >> > >         self.col = self.db.get_collection(self.collection)
> >> > >         self.cursor = self.col.watch(full_document="updateLookup")
> >> > >
> >> > >         with self.cursor as stream:
> >> > >             for change in stream:
> >> > >                 yield change
> >> > >
> >> > > pipeline = (
> >> > >     p
> >> > >     | 'dummy_create' >> beam.Create([0])
> >> > >     | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE,
> COLLECTION))
> >> > >     | 'process' >> beam.Map(process)
> >> > > )
> >> > > ```
> >> > >
> >> > > My hunch is that there's a way to leverage the StreamingCreate
> PTransform to read MongoDB Oplog or any other external unbounded source.
> Alternatively, I've also seen a good example on how to create a
> BoundedSource. This might be similar for an unbounded one but I think the
> Beam Programming Guide discourages building sources using the Source API.
> >> > >
> >> > > I'd appreciate any input or feedback you might have about the code
> and approach I'm taking!
> >> > >
> >> > > Thanks,
> >> > > David.
>


-- 

Pascal Gula
Senior Data Engineer / Scientist
+49 (0)176 34232684www.plantix.net <http://plantix.net/>
 PEAT GmbH
Kastanienallee 4
10435 Berlin // Germany
 <https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download
the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>