You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Srikanth <sr...@gmail.com> on 2016/04/21 03:05:56 UTC

Join DataStream with dimension tables?

Hello,

I have a fairly typical streaming use case but not able to figure how to
implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension
tables which are saved as flat files.

As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its not
possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash
and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth

Re: Join DataStream with dimension tables?

Posted by Srikanth <sr...@gmail.com>.
Aljoscha,

Your thoughts on this?

Srikanth

On Mon, Apr 25, 2016 at 8:08 PM, Srikanth <sr...@gmail.com> wrote:

> Aljoscha,
>
> Looks like a potential solution. Feels a bit hacky though.
>
> Didn't quite understand why a list backed store is used to for static
> input buffer? Join(inner) should emit only one record if there is a key
> match.
>
> Is it a property of the system to emit Long.MAX_VALUE watermark when a
> finite stream source ends?
> If so can I do something like this to read static file in parallel?
>     val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)
>
> Shouldn't we also override checkpoint handling of custom operator? If so,
> should the checkpoint wait/fail during the initial read phase?
>
> Lohith,
> Adding a component like Cassandra just for this feels like a overkill. But
> if I can't find a suitable way to do this, I might use it( or Redis
> probably).
>
> Srikanth
>
>
>
> On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M <
> Lohith.Samaga@mphasis.com> wrote:
>
>> Hi,
>> Cassandra could be used as a distributed cache.
>>
>> Lohith.
>>
>> Sent from my Sony Xperia™ smartphone
>>
>>
>> ---- Aljoscha Krettek wrote ----
>>
>>
>> Hi Srikanth,
>> that's an interesting use case. It's not possible to do something like
>> this out-of-box but I'm actually working on API for such cases.
>>
>> In the mean time, I programmed a short example that shows how something
>> like this can be programmed using the API that is currently available. It
>> requires writing a custom operator but it is still somewhat succinct:
>> https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906
>>
>> Please let me know if you have any questions.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 03:06 Srikanth <sr...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a fairly typical streaming use case but not able to figure how to
>>> implement it best in Flink.
>>> I want to join records read from a kafka stream with one(or more)
>>> dimension tables which are saved as flat files.
>>>
>>> As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its
>>> not possible to join DataStream with DataSet.
>>> These tables are too big to do a collect() and join.
>>>
>>> It will be good to read these files during startup, do a partitionByHash
>>> and keep it cached.
>>> On the DataStream may be do a keyBy and join.
>>> Is something like this possible?
>>>
>>> Srikanth
>>>
>>
>> Information transmitted by this e-mail is proprietary to Mphasis, its
>> associated companies and/ or its customers and is intended
>> for use only by the individual or entity to which it is addressed, and
>> may contain information that is privileged, confidential or
>> exempt from disclosure under applicable law. If you are not the intended
>> recipient or it appears that this mail has been forwarded
>> to you without proper authority, you are notified that any use or
>> dissemination of this information in any manner is strictly
>> prohibited. In such cases, please notify us immediately at
>> mailmaster@mphasis.com and delete this mail from your records.
>>
>
>

Re: Join DataStream with dimension tables?

Posted by Srikanth <sr...@gmail.com>.
Aljoscha,

Looks like a potential solution. Feels a bit hacky though.

Didn't quite understand why a list backed store is used to for static input
buffer? Join(inner) should emit only one record if there is a key match.

Is it a property of the system to emit Long.MAX_VALUE watermark when a
finite stream source ends?
If so can I do something like this to read static file in parallel?
    val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)

Shouldn't we also override checkpoint handling of custom operator? If so,
should the checkpoint wait/fail during the initial read phase?

Lohith,
Adding a component like Cassandra just for this feels like a overkill. But
if I can't find a suitable way to do this, I might use it( or Redis
probably).

Srikanth



On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M <Lohith.Samaga@mphasis.com
> wrote:

> Hi,
> Cassandra could be used as a distributed cache.
>
> Lohith.
>
> Sent from my Sony Xperia™ smartphone
>
>
> ---- Aljoscha Krettek wrote ----
>
>
> Hi Srikanth,
> that's an interesting use case. It's not possible to do something like
> this out-of-box but I'm actually working on API for such cases.
>
> In the mean time, I programmed a short example that shows how something
> like this can be programmed using the API that is currently available. It
> requires writing a custom operator but it is still somewhat succinct:
> https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906
>
> Please let me know if you have any questions.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 03:06 Srikanth <sr...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a fairly typical streaming use case but not able to figure how to
>> implement it best in Flink.
>> I want to join records read from a kafka stream with one(or more)
>> dimension tables which are saved as flat files.
>>
>> As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its
>> not possible to join DataStream with DataSet.
>> These tables are too big to do a collect() and join.
>>
>> It will be good to read these files during startup, do a partitionByHash
>> and keep it cached.
>> On the DataStream may be do a keyBy and join.
>> Is something like this possible?
>>
>> Srikanth
>>
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmaster@mphasis.com and delete this mail from your records.
>

Re: Join DataStream with dimension tables?

Posted by Lohith Samaga M <Lo...@mphasis.com>.
Hi,
Cassandra could be used as a distributed cache.

Lohith.

Sent from my Sony Xperia™ smartphone


---- Aljoscha Krettek wrote ----

Hi Srikanth,
that's an interesting use case. It's not possible to do something like this out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something like this can be programmed using the API that is currently available. It requires writing a custom operator but it is still somewhat succinct:
https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <sr...@gmail.com>> wrote:
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files.

As per this jira<https://issues.apache.org/jira/browse/FLINK-2320> its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth
Information transmitted by this e-mail is proprietary to Mphasis, its associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at mailmaster@mphasis.com and delete this mail from your records.

Re: Join DataStream with dimension tables?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Srikanth,
that's an interesting use case. It's not possible to do something like this
out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something
like this can be programmed using the API that is currently available. It
requires writing a custom operator but it is still somewhat succinct:
https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <sr...@gmail.com> wrote:

> Hello,
>
> I have a fairly typical streaming use case but not able to figure how to
> implement it best in Flink.
> I want to join records read from a kafka stream with one(or more)
> dimension tables which are saved as flat files.
>
> As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its
> not possible to join DataStream with DataSet.
> These tables are too big to do a collect() and join.
>
> It will be good to read these files during startup, do a partitionByHash
> and keep it cached.
> On the DataStream may be do a keyBy and join.
> Is something like this possible?
>
> Srikanth
>