You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by JingsongLee <lz...@aliyun.com> on 2017/04/13 15:40:05 UTC

Join to external table

Hi all,


I've seen repeatedly the following pattern:
Consider a sql (Joining stream to table, from Calcite):
SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
  p.name, p.unitPrice
FROM Orders AS o
JOIN Products AS p
  ON o.productId = p.productId;
A stream-to-table join is straightforward if the contents of the table are not 
changing(or slowly changing). This query enriches a stream of orders with 
each product’s list price.

This table is mostly in HBase or Mysql or Redis. Most of our users think that 
we should use SideInputs to implement it. But there are some difficulties here:
1.Maybe this table is very large! AFAIK, SideInputs will load all data to internal. 
We can not load all, but we can do some caching work. 
2.This table may be updated periodically. As mentioned in 
https://issues.apache.org/jira/browse/BEAM-1197
3.Sometimes users want to update this table, in some scene which doesn’t 
need high accuracy. (The read and write to the external storage can’t guarantee 
Exacly-Once)

So we developed a component called DimState(Maybe the name is not right). 
Use cache(It is LoadingCache now) or load all.  They all have Time-To-Live 
mechanism. An abstract interface is called ExternalState. There are 
HBaseState, JDBCState, RedisState. It is accessed by key and namespace. 
Provides bulk access to the external table for performance.

Is there a better way to implement it? Can we make some abstracts in Beam Model? 

What do you think?

Best,
JingsongLee

Re: Join to external table

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi,

Note that there's a JIRA about this -
https://issues.apache.org/jira/browse/BEAM-1197 - but no solution yet :)

On Fri, Apr 14, 2017 at 10:58 AM Dan Halperin <dh...@google.com.invalid>
wrote:

> Hi Jingsong,
>
> This seems like a fantastic, reusable pattern to add, and indeed it's a
> fairly common one. There are probably some interesting API issues too --
> such as how you make a nice clean interface that works for many backends
> (Bigtable? HBase? Redis? Memcache? etc.), and how you let users supply a
> caching policy.
>
> It sounds like you may have already worked through these -- would you like
> to write down what you've learned and send out a short proposal?
>
> Thanks!
>
> On Thu, Apr 13, 2017 at 8:40 AM, JingsongLee <lz...@aliyun.com>
> wrote:
>
> > Hi all,
> >
> >
> > I've seen repeatedly the following pattern:
> > Consider a sql (Joining stream to table, from Calcite):
> > SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
> >   p.name, p.unitPrice
> > FROM Orders AS o
> > JOIN Products AS p
> >   ON o.productId = p.productId;
> > A stream-to-table join is straightforward if the contents of the table
> are
> > not
> > changing(or slowly changing). This query enriches a stream of orders with
> > each product’s list price.
> >
> > This table is mostly in HBase or Mysql or Redis. Most of our users think
> > that
> > we should use SideInputs to implement it. But there are some difficulties
> > here:
> > 1.Maybe this table is very large! AFAIK, SideInputs will load all data to
> > internal.
> > We can not load all, but we can do some caching work.
> > 2.This table may be updated periodically. As mentioned in
> > https://issues.apache.org/jira/browse/BEAM-1197
> > 3.Sometimes users want to update this table, in some scene which doesn’t
> > need high accuracy. (The read and write to the external storage can’t
> > guarantee
> > Exacly-Once)
> >
> > So we developed a component called DimState(Maybe the name is not right).
> > Use cache(It is LoadingCache now) or load all.  They all have
> Time-To-Live
> > mechanism. An abstract interface is called ExternalState. There are
> > HBaseState, JDBCState, RedisState. It is accessed by key and namespace.
> > Provides bulk access to the external table for performance.
> >
> > Is there a better way to implement it? Can we make some abstracts in Beam
> > Model?
> >
> > What do you think?
> >
> > Best,
> > JingsongLee
> >
>

Re: Join to external table

Posted by Dan Halperin <dh...@google.com.INVALID>.
Hi Jingsong,

This seems like a fantastic, reusable pattern to add, and indeed it's a
fairly common one. There are probably some interesting API issues too --
such as how you make a nice clean interface that works for many backends
(Bigtable? HBase? Redis? Memcache? etc.), and how you let users supply a
caching policy.

It sounds like you may have already worked through these -- would you like
to write down what you've learned and send out a short proposal?

Thanks!

On Thu, Apr 13, 2017 at 8:40 AM, JingsongLee <lz...@aliyun.com>
wrote:

> Hi all,
>
>
> I've seen repeatedly the following pattern:
> Consider a sql (Joining stream to table, from Calcite):
> SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
>   p.name, p.unitPrice
> FROM Orders AS o
> JOIN Products AS p
>   ON o.productId = p.productId;
> A stream-to-table join is straightforward if the contents of the table are
> not
> changing(or slowly changing). This query enriches a stream of orders with
> each product’s list price.
>
> This table is mostly in HBase or Mysql or Redis. Most of our users think
> that
> we should use SideInputs to implement it. But there are some difficulties
> here:
> 1.Maybe this table is very large! AFAIK, SideInputs will load all data to
> internal.
> We can not load all, but we can do some caching work.
> 2.This table may be updated periodically. As mentioned in
> https://issues.apache.org/jira/browse/BEAM-1197
> 3.Sometimes users want to update this table, in some scene which doesn’t
> need high accuracy. (The read and write to the external storage can’t
> guarantee
> Exacly-Once)
>
> So we developed a component called DimState(Maybe the name is not right).
> Use cache(It is LoadingCache now) or load all.  They all have Time-To-Live
> mechanism. An abstract interface is called ExternalState. There are
> HBaseState, JDBCState, RedisState. It is accessed by key and namespace.
> Provides bulk access to the external table for performance.
>
> Is there a better way to implement it? Can we make some abstracts in Beam
> Model?
>
> What do you think?
>
> Best,
> JingsongLee
>