You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Christopher Bockman <c....@gmail.com> on 2017/11/02 17:26:51 UTC

design pattern for enriching data via db lookups?

Hi,

We're evaluating Beam and trying to figure out if it meets our needs
(suspect it does!), and, if so, how to best set up our code.

Many thanks in advance.

*Basic scenario:*

* Data (for machine learning/prediction) comes in.  Data is a set of
documents which are 100% independent.  We want to apply some
transformations to those items on a per-doc basis.

- Many of the transformations are easily and obviously encapsulated in beam
user code.

- *However, we'd also like to enrich the data via queries to external
databases.  How do we do that efficiently *(largely in time, but also in
compute resources)*?*

*Additional constraints:*

- We are developing on Google Cloud, if it matters.

- Ideally we can achieve below in Python (versus Java), to ease porting
existing code.

*Some examples:*

1) *Key-value query.*

Text comes in, and we'd like to do some pre-processing to the text, and
then look up certain subsets of that text against an external database.
Those found mappings need to be associated with the underlying text.

E.g., imagine we're doing Named Entity Recognition and trying to augment
with a large, fine-grained external gazetteer.

"I went to the Los Angeles Dodgers game."  (RIP)

Perhaps we generate ngrams ("I", ..., "I went", "went to", ..., "I went
to", ..., "Los Angeles Dodgers", ...) and then find that "Los Angeles
Dodgers" maps to entity 123456, and "Los Angeles" maps to 234567, and we
want to map those back into the underlying document.

2) *More sophisticated query.*

We do a bunch of calculations on the data, and then derive some
intermediary result, and need to look that result up against an external
database to generate a final result for the data.  These queries might
require a bit more SQL sophistication (table joining, looking up multiple
rows and filtering, etc.).

* Scenario #1 is more important than #2, because, worst case, we can
probably cast most of our external enrichment to a key-value paradigm.

*The concern: the database query becomes the choke point*

* Most naive implementation would seem to be write user code that grabs
each doc and does a remote database lookup for that doc.

We initially had this implemented (outside of Beam), but found that
(unsurprisingly) *the round-trip to the database became a blocker*--code
would just be waiting on the DB round-trip and so processing slowed down
dramatically (from keeping the db local via, ultimately unmanagable, a
local SQLlite instance).

Our solution was to 1) implement multi-threading (to limit the db queries
blocking) and 2) implement local caching of lookups (using
https://dogpilecache.readthedocs.io/en/latest/usage.html).  Both of these
did dramatically sped things up for the single-machine (non-Beam) scenario.

*Is there an equivalent (direct code or design pattern) of either #1 or #2
in Beam?  *(The answer to #1 might just be that Beam automatically adds
more documents to be processed when it realizes things are slower than they
"should be"?)

*Possible solution?: pull the table(s), in full, down to the Beam cluster*

* The tables we're working with aren't terribly large by modern standards
(although I'd like to design for this potentially changing)--maybe a few GB
at most, and probably easily shaved down to 100s of MBs.  Conceptually--if
quicker--we could (I think?) do something like pull the entire table down
in a PCollection and then use that data "locally" (i.e., within the Beam
cluster).

- Presumably, for key-value lookup, we could write some query to
efficiently cross-reference the two PCollection's (i.e., the db and the
actual source data).  (...although I haven't investigated deeply into Beam
to confirm this is realistically doable?)

- For more complex queries, it is less clear to me how to do the above,
because we'd basically need to be able to run SQL queries on the Beam
cluster.  (Possibly via https://beam.apache.org/documentation/dsls/sql/?
Although I'm not clear how solid this is or if it is available on Cloud
Dataflow.)

*If pulling the table down into the cluster is the correct solution is
there a recommended way to do so?*  Would it be via JDBC or a Redis
connector (for k:v) (https://beam.apache.org/documentation/io/built-in/),
perhaps?  (Although, per top, pure Python would be preferable.)

Re: design pattern for enriching data via db lookups?

Posted by Lukasz Cwik <lc...@google.com>.
For joining with external data you have some options:
* Do direct calls to the external datastore, perform your own in memory
caching/expiration. You control exactly what happens and when it happens
but as you have done this in the past you know what this entails.
* Ingest the external data and perform CoGBK
<https://beam.apache.org/documentation/programming-guide/#cogroupbykey> on
a common key. Works well for datasets which have comparable data sizes.
* Ingest the external data and use it as a map/multimap side input
<https://beam.apache.org/documentation/programming-guide/#side-inputs>.
Works well for datasets where one dataset is much smaller then the other.
(Especially if the dataset can fit in memory).

Based on your data set being small I would suggest using the side input
approach. When you ingest the external data, you can perform any transforms
that are required to get to the common key including running the Beam SQL
stuff. The SQL stuff is available for Cloud Dataflow but not yet officially
supported. As for ingesting the external data, it all depends on where it
is coming from but the closest IO connector to your data source is the best.


On Thu, Nov 2, 2017 at 10:26 AM, Christopher Bockman <c....@gmail.com>
wrote:

> Hi,
>
> We're evaluating Beam and trying to figure out if it meets our needs
> (suspect it does!), and, if so, how to best set up our code.
>
> Many thanks in advance.
>
> *Basic scenario:*
>
> * Data (for machine learning/prediction) comes in.  Data is a set of
> documents which are 100% independent.  We want to apply some
> transformations to those items on a per-doc basis.
>
> - Many of the transformations are easily and obviously encapsulated in
> beam user code.
>
> - *However, we'd also like to enrich the data via queries to external
> databases.  How do we do that efficiently *(largely in time, but also in
> compute resources)*?*
>
> *Additional constraints:*
>
> - We are developing on Google Cloud, if it matters.
>
> - Ideally we can achieve below in Python (versus Java), to ease porting
> existing code.
>
> *Some examples:*
>
> 1) *Key-value query.*
>
> Text comes in, and we'd like to do some pre-processing to the text, and
> then look up certain subsets of that text against an external database.
> Those found mappings need to be associated with the underlying text.
>
> E.g., imagine we're doing Named Entity Recognition and trying to augment
> with a large, fine-grained external gazetteer.
>
> "I went to the Los Angeles Dodgers game."  (RIP)
>
> Perhaps we generate ngrams ("I", ..., "I went", "went to", ..., "I went
> to", ..., "Los Angeles Dodgers", ...) and then find that "Los Angeles
> Dodgers" maps to entity 123456, and "Los Angeles" maps to 234567, and we
> want to map those back into the underlying document.
>
> 2) *More sophisticated query.*
>
> We do a bunch of calculations on the data, and then derive some
> intermediary result, and need to look that result up against an external
> database to generate a final result for the data.  These queries might
> require a bit more SQL sophistication (table joining, looking up multiple
> rows and filtering, etc.).
>
> * Scenario #1 is more important than #2, because, worst case, we can
> probably cast most of our external enrichment to a key-value paradigm.
>
> *The concern: the database query becomes the choke point*
>
> * Most naive implementation would seem to be write user code that grabs
> each doc and does a remote database lookup for that doc.
>
> We initially had this implemented (outside of Beam), but found that
> (unsurprisingly) *the round-trip to the database became a blocker*--code
> would just be waiting on the DB round-trip and so processing slowed down
> dramatically (from keeping the db local via, ultimately unmanagable, a
> local SQLlite instance).
>
> Our solution was to 1) implement multi-threading (to limit the db queries
> blocking) and 2) implement local caching of lookups (using
> https://dogpilecache.readthedocs.io/en/latest/usage.html).  Both of these
> did dramatically sped things up for the single-machine (non-Beam) scenario.
>
> *Is there an equivalent (direct code or design pattern) of either #1 or #2
> in Beam?  *(The answer to #1 might just be that Beam automatically adds
> more documents to be processed when it realizes things are slower than they
> "should be"?)
>
> *Possible solution?: pull the table(s), in full, down to the Beam cluster*
>
> * The tables we're working with aren't terribly large by modern standards
> (although I'd like to design for this potentially changing)--maybe a few GB
> at most, and probably easily shaved down to 100s of MBs.  Conceptually--if
> quicker--we could (I think?) do something like pull the entire table down
> in a PCollection and then use that data "locally" (i.e., within the Beam
> cluster).
>
> - Presumably, for key-value lookup, we could write some query to
> efficiently cross-reference the two PCollection's (i.e., the db and the
> actual source data).  (...although I haven't investigated deeply into Beam
> to confirm this is realistically doable?)
>
> - For more complex queries, it is less clear to me how to do the above,
> because we'd basically need to be able to run SQL queries on the Beam
> cluster.  (Possibly via https://beam.apache.org/documentation/dsls/sql/?
> Although I'm not clear how solid this is or if it is available on Cloud
> Dataflow.)
>
> *If pulling the table down into the cluster is the correct solution is
> there a recommended way to do so?*  Would it be via JDBC or a Redis
> connector (for k:v) (https://beam.apache.org/documentation/io/built-in/),
> perhaps?  (Although, per top, pure Python would be preferable.)
>
>
>