You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rion Williams <ri...@gmail.com> on 2020/06/02 00:17:04 UTC

Multiple Tenant Consolidation via ElasticSearch in Beam Pipeline

Hi folks,

I was talking with a colleague about a scenario he was facing and we were exploring the idea of Beam as a possible solution. I thought I’d reach out to the audience hear to get their opinions.

Basically, we have a series of single tenant Elasticsearch indices that we are attempting to read from, transform, and ultimately send to a Kafka topic to be consumed by some downstream multi-tenant Beam pipeline.

The current working thoughts are something to the effect of:
- Read all of the locations of the various tenant indices from a centralized location.
- Construct the appropriate transforms (perhaps just via a `configurations.forEach()` or some other pattern)
- Apply the transforms against the incoming data from Elastic (should be uniformly constricted in terms of schema)
- Send to a Kafka topic with the tenant identifier as part of the transform process

Does this seem like something Beam would be suitable for? These indices could be quite large and updated frequently (depending on the tenant), so I don’t know if watermarking should be a concern. Specifically watermarking for each tenant to avoid ingesting the same data or in cases of the services going down, being able to resume without lost data. I’m not aware of Beam or the Elastic connector having this notion, but I haven’t explored it in-depth.

Additionally, there are scaling concerns (e.g. if one particular tenant has a large amount of volume and others have very little are their mechanisms for handling that?). What if there were thousands of tenants? Could a single pipeline effectively handle that kind of volume?

Any thoughts or advice would be appreciated. I’d love to have the confidence to use Beam for this, but if it’s not the right tool I don’t want to fight it more than necessary.

Thanks,

Rion

Re: Multiple Tenant Consolidation via ElasticSearch in Beam Pipeline

Posted by Rion Williams <ri...@gmail.com>.
Hi Luke,

Thanks for the response, it was extremely useful. 

After exploring a bit more, it doesn't look like the existing Elasticsearch connector supports either the `readAll()` pattern at the moment nor does it have support for a SplittableDoFn (although there is a open JIRA for it <https://issues.apache.org/jira/browse/BEAM-6654>), which leads me to believe that we'd need to construct a rather large graph, which may/may not be feasible.

A few rough numbers surrounding this scenario would likely involve several thousand tenants (~4000) and at a minimum two transforms per tenant, which seems like it'd be pushing the side of the graph to its limits and would require the use of the `readAll()` pattern. I believe that the current proposed approach also relies on Spark as a runner, which seems to have some notions around dynamic allocations in terms of handling parallelism, but that might be moot if we can't read all of those sources in a single pipeline.

I think the two major concerns are as follows:

- Scaling - Volumes may vary widely from tenant to tenant (e.g. one may have hundreds of thousands of records daily, others may be orders of magnitude smaller) so ensuring that Beam / Spark is smart enough to allocate workers to meet the demands of a "hungrier" tenant.
- Watermarking - It doesn't seem that the Elasticsearch connector, as far as I can tell, has any support for persisting watermarks and I'm unsure if there are any patterns around handling this at a tenant level (e.g. due to volume discrepancies watermarks will need to be retained so that subsequent calls know where to read from, would multiple workers complicate this if they are reading from the same source?)

I'll continue to explore things a bit more, but I certainly don't want to go against the grain in terms of patterning or undertake a large amount of work (such as attempting to add support for non-existant features) if another upstream tool could accomplish this.

Thanks again Luke,

Rion

On 2020/06/02 20:11:52, Luke Cwik <lc...@google.com> wrote: 
> Your use case seems fine and is likely more dependent on the quality of the
> IO connectors you use and/or how much time you're willing to invest in
> filling in any gaps they may have.
> 
> You'll want to make sure that the connector has good initial splitting
> logic to make sure that large tenants split more finely then small tenants.
> Using a runner that supports dynamic splitting would help fix imbalances
> caused by initial splitting.
> 
> If each tenant  is represented by one or more transforms then you may run
> into issues where the pipeline graph is too large for some runners and you
> would want the connector to follow the readAll pattern that is seen on
> various IO connectors. This uses a common transform and the input is a
> PCollection of "source descriptors". This pattern scales to the data limits
> of the runner that is being used (orders of magnitude larger then the graph
> size). The readAll is best if powered by a splittable DoFn internally so we
> get initial splitting and dynamic splitting still happening. With the
> readAll pattern, the "read all the locations of various tenant indices"
> would move to occur within a DoFn and would happen when the pipeline
> executes.
> 
> On Mon, Jun 1, 2020 at 5:17 PM Rion Williams <ri...@gmail.com> wrote:
> 
> > Hi folks,
> >
> > I was talking with a colleague about a scenario he was facing and we were
> > exploring the idea of Beam as a possible solution. I thought I’d reach out
> > to the audience hear to get their opinions.
> >
> > Basically, we have a series of single tenant Elasticsearch indices that we
> > are attempting to read from, transform, and ultimately send to a Kafka
> > topic to be consumed by some downstream multi-tenant Beam pipeline.
> >
> > The current working thoughts are something to the effect of:
> > - Read all of the locations of the various tenant indices from a
> > centralized location.
> > - Construct the appropriate transforms (perhaps just via a
> > `configurations.forEach()` or some other pattern)
> > - Apply the transforms against the incoming data from Elastic (should be
> > uniformly constricted in terms of schema)
> > - Send to a Kafka topic with the tenant identifier as part of the
> > transform process
> >
> > Does this seem like something Beam would be suitable for? These indices
> > could be quite large and updated frequently (depending on the tenant), so I
> > don’t know if watermarking should be a concern. Specifically watermarking
> > for each tenant to avoid ingesting the same data or in cases of the
> > services going down, being able to resume without lost data. I’m not aware
> > of Beam or the Elastic connector having this notion, but I haven’t explored
> > it in-depth.
> >
> > Additionally, there are scaling concerns (e.g. if one particular tenant
> > has a large amount of volume and others have very little are their
> > mechanisms for handling that?). What if there were thousands of tenants?
> > Could a single pipeline effectively handle that kind of volume?
> >
> > Any thoughts or advice would be appreciated. I’d love to have the
> > confidence to use Beam for this, but if it’s not the right tool I don’t
> > want to fight it more than necessary.
> >
> > Thanks,
> >
> > Rion
> 

Re: Multiple Tenant Consolidation via ElasticSearch in Beam Pipeline

Posted by Luke Cwik <lc...@google.com>.
Your use case seems fine and is likely more dependent on the quality of the
IO connectors you use and/or how much time you're willing to invest in
filling in any gaps they may have.

You'll want to make sure that the connector has good initial splitting
logic to make sure that large tenants split more finely then small tenants.
Using a runner that supports dynamic splitting would help fix imbalances
caused by initial splitting.

If each tenant  is represented by one or more transforms then you may run
into issues where the pipeline graph is too large for some runners and you
would want the connector to follow the readAll pattern that is seen on
various IO connectors. This uses a common transform and the input is a
PCollection of "source descriptors". This pattern scales to the data limits
of the runner that is being used (orders of magnitude larger then the graph
size). The readAll is best if powered by a splittable DoFn internally so we
get initial splitting and dynamic splitting still happening. With the
readAll pattern, the "read all the locations of various tenant indices"
would move to occur within a DoFn and would happen when the pipeline
executes.

On Mon, Jun 1, 2020 at 5:17 PM Rion Williams <ri...@gmail.com> wrote:

> Hi folks,
>
> I was talking with a colleague about a scenario he was facing and we were
> exploring the idea of Beam as a possible solution. I thought I’d reach out
> to the audience hear to get their opinions.
>
> Basically, we have a series of single tenant Elasticsearch indices that we
> are attempting to read from, transform, and ultimately send to a Kafka
> topic to be consumed by some downstream multi-tenant Beam pipeline.
>
> The current working thoughts are something to the effect of:
> - Read all of the locations of the various tenant indices from a
> centralized location.
> - Construct the appropriate transforms (perhaps just via a
> `configurations.forEach()` or some other pattern)
> - Apply the transforms against the incoming data from Elastic (should be
> uniformly constricted in terms of schema)
> - Send to a Kafka topic with the tenant identifier as part of the
> transform process
>
> Does this seem like something Beam would be suitable for? These indices
> could be quite large and updated frequently (depending on the tenant), so I
> don’t know if watermarking should be a concern. Specifically watermarking
> for each tenant to avoid ingesting the same data or in cases of the
> services going down, being able to resume without lost data. I’m not aware
> of Beam or the Elastic connector having this notion, but I haven’t explored
> it in-depth.
>
> Additionally, there are scaling concerns (e.g. if one particular tenant
> has a large amount of volume and others have very little are their
> mechanisms for handling that?). What if there were thousands of tenants?
> Could a single pipeline effectively handle that kind of volume?
>
> Any thoughts or advice would be appreciated. I’d love to have the
> confidence to use Beam for this, but if it’s not the right tool I don’t
> want to fight it more than necessary.
>
> Thanks,
>
> Rion