You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2017/07/28 13:34:21 UTC

Flink QueryableState with Sliding Window on RocksDB

Hi,

We recently moved from Spark Streaming to Flink for our stream processing
requirements in our organization and we are in the process of removing the
number of external calls as much as possible. Earlier we were using HBASE to
store the incoming data, but we now want to try out stateful operations on
top of Flink.

In that aspect, we have fixed that we need to have a sliding window of size
180 days with a slide Interval of 1 day each such that we keep a state of
180 days at any given time. This state would at max be around 40-50 GB for
the 180 days so we thought of using RocksDB for state storage. 

Now the flow of job we are thinking would be incoming events and some extra
information:

events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
UpdatedTxnState());

where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
looks something like this :


public class UpdatedTxnState extends RichFlatMapFunction<Tuple3&lt;String,
List&lt;String>, EventType>, Tuple2<String, EventType>> {

  private ValueState<Tuple3&lt;EventType, List&lt;String>, String>>
txnState;

  @Override
  public void open(Configuration config) throws Exception {
    // Reducing state that keeps a sum
    ValueStateDescriptor<Tuple3&lt;EventType, List&lt;String>, String>>
stateDescriptor = new ValueStateDescriptor<>(
            "transaction", TypeInformation.of(new
TypeHint<Tuple3&lt;EventType, List&lt;String>, String>>() {
    }));

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void flatMap(Tuple3<String, List&lt;String>, EventType> input,
                      Collector<Tuple2&lt;String, EventType>> output) throws
Exception {


    txnState.update(new Tuple3<>(input._3(),input._2(),input._1());

    output.collect(new Tuple2<>(input._1(),input._3()));
  }
}


now, I have a couple of questions :
1. how can I create a sliding window on top of this state? I can think of
doing a keyby on the output of flatmap but for me doesn't really make much
sense and I didn't really find a way to build a state after windowing. 
2. Can I query the state with the state name I defined here "transaction"
anywhere in my job?

Thanks,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Biplob Biswas <re...@gmail.com>.
Hi Fabian,

I am not really sure using CoProcessFunction would be useful for my use
case. My use case, in short, can be explained as follows:

1) create 2 different local state store, where both have 1-N relationship.
For eg.  1 -> [A,B,C] and A -> [1,2,3]
2) Based on the key A, get list of elements [1,2,3] and then iterate over
this list and based on the keys 1,2,3 query the second store to get the list
of elements. 
3) Do this till a depth of 1
4) Now based on Key A gain perform merge operations and emit the merged
output. 

So, I can't imagine having 2 keyed state together when I need to query them
randomly and not just on the key of one store. That's why we need 2
queryable state which can be queried in the next operator together. 

That's why I am not very optimistic about the CoProcessFunction for my case.
Maybe I am wrong and I have missed something, so any insights would be
useful! 

Regards
Biplob




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14606.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Fabian Hueske <fh...@gmail.com>.
Having an operator that updates state from one stream and queries it to
process the other stream is actually a common pattern.
As I said, I don't know your use case but I don't think that a
CoProcessFunction would result in a mess.

QueryableState will have quite a bit of overhead because the request and
response will always go over the network.

2017-07-31 15:23 GMT+02:00 Biplob Biswas <re...@gmail.com>:

> Hi Fabian,
>
> Thanks for the insight, I am currently exploring  QueryableStateClient and
> would attempt to get the value for a corresponding key using the
> getkvstate() function,
> I was confused about the jobId but I am expecting this would provide me
> with
> the jobid of the current job -
> ExecutionEnvironment.getExecutionEnvironment().getId()
>
> I thought about updating and doing the look up from a single operator but
> then I believe my code would be a mess with a lot of logic and no logical
> separation, so that's the last thing I want to try.
>
> My team is also exploring KStreams with Ktables and they claim that does
> what we want to do, will update with any further information. If possible
> contribute to Flink to use the keyedstate natively in the same job as well.
>
> Thanks for the help,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14558.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Biplob Biswas <re...@gmail.com>.
Hi Fabian,

Thanks for the insight, I am currently exploring  QueryableStateClient and
would attempt to get the value for a corresponding key using the
getkvstate() function, 
I was confused about the jobId but I am expecting this would provide me with
the jobid of the current job -
ExecutionEnvironment.getExecutionEnvironment().getId()

I thought about updating and doing the look up from a single operator but
then I believe my code would be a mess with a lot of logic and no logical
separation, so that's the last thing I want to try.

My team is also exploring KStreams with Ktables and they claim that does
what we want to do, will update with any further information. If possible
contribute to Flink to use the keyedstate natively in the same job as well. 

Thanks for the help,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Fabian Hueske <fh...@gmail.com>.
I am not sure that this is impossible, but it is not the use case queryable
state was designed for.
I don't know the details of your application, but you could try to merge
the updating and the querying operators into a single one.

You could connect two streams with connect() and use a keyed
CoProcessFunction. This will have the advantages that all state access are
local.

Cheers, Fabian

2017-07-31 13:24 GMT+02:00 Biplob Biswas <re...@gmail.com>:

> Hi Fabian,
>
> I read about the process function and it seems a perfect fit for my
> requirement. Although while reading more about queryable-state I found that
> its not meant to do lookups within job (Your comment in the following
> link).
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Best-practices-to-maintain-reference-data-
> for-Flink-Jobs-td13215.html#a13233
>
> The thing is I wanted to maintain a local state store which is queryable
> within the same job such that one operator creates and updates it and then
> another operator queries the store based on a given key.
>
> I was hoping this to be possible but it seems not to be the case, can you
> shed some light and if possible recommend some alternatives?
>
> Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14555.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Biplob Biswas <re...@gmail.com>.
Hi Fabian,

I read about the process function and it seems a perfect fit for my
requirement. Although while reading more about queryable-state I found that
its not meant to do lookups within job (Your comment in the following link).

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-practices-to-maintain-reference-data-for-Flink-Jobs-td13215.html#a13233

The thing is I wanted to maintain a local state store which is queryable
within the same job such that one operator creates and updates it and then
another operator queries the store based on a given key. 

I was hoping this to be possible but it seems not to be the case, can you
shed some light and if possible recommend some alternatives? 

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14555.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Biplob Biswas <re...@gmail.com>.
Hi Fabian,

Thanks a lot for pointing that out would read about it and give it a try. 

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14551.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Biplob,

given these requirements, I would rather not use a window but implement the
functionality with a stateful ProcessFunction.
A ProcessFunction can register timers, e.g., to remove inactive state. The
state of a ProcessFunction can be made queryable.

Best, Fabian

2017-07-31 9:52 GMT+02:00 Biplob Biswas <re...@gmail.com>:

> Thanks Fabian for the reply, I was reconsidering my design and the
> requirement and what I mentioned already is partially confusing.
>
> I realized that using a sessionwindow is better in this scenario where I
> want a value to be updated per key and the session resets to wait for the
> gap period with every update after which it should be removed.
>
> From whatever I read currently about session windows (and windows in
> general), is that I can perform aggregates over it. What I was thinking was
> that rather than performing an aggregate or reduce, I can simply replace
> the
> old value.
>
> now my problem is, would such a state be queryable?
>
> As you mentioned that siding windows are defined on streams, is it the same
> for session windows?
>
> "I basically want a state which updates by key, and where records can
> self-destruct after a fixed amount of time(basically records being part of
> a
> session) if not updated (such that the state doesn't grow indefinitely)?"
>
> Are there any abstractions to do the same and if not, is it possible to
> discuss if this can be done otherwise?
>
> Thanks a lot,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14549.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Biplob Biswas <re...@gmail.com>.
Thanks Fabian for the reply, I was reconsidering my design and the
requirement and what I mentioned already is partially confusing. 

I realized that using a sessionwindow is better in this scenario where I
want a value to be updated per key and the session resets to wait for the
gap period with every update after which it should be removed.

From whatever I read currently about session windows (and windows in
general), is that I can perform aggregates over it. What I was thinking was
that rather than performing an aggregate or reduce, I can simply replace the
old value. 

now my problem is, would such a state be queryable? 

As you mentioned that siding windows are defined on streams, is it the same
for session windows? 

"I basically want a state which updates by key, and where records can
self-destruct after a fixed amount of time(basically records being part of a
session) if not updated (such that the state doesn't grow indefinitely)?"  

Are there any abstractions to do the same and if not, is it possible to
discuss if this can be done otherwise? 

Thanks a lot,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14549.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink QueryableState with Sliding Window on RocksDB

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Biplob,

What do you mean by "creating a sliding window on top of a state"?
Sliding windows are typically defined on streams (data in motion) and not
on state (data at rest).

It seems that UpdatedTxnState always holds the last record that was
received per key. Do you want to compute the windows always on the last
records you received per key?
This would mean that you need to retract the overwritten values, i.e.,
remove them from the result, and add the new values to the result.
This can be implemented but requires a good design. You cannot use the
built-in sliding windows of the DataStream because they do not support
retraction.
I think you have to implement this functionality yourself.

Best, Fabian





2017-07-28 15:34 GMT+02:00 Biplob Biswas <re...@gmail.com>:

> Hi,
>
> We recently moved from Spark Streaming to Flink for our stream processing
> requirements in our organization and we are in the process of removing the
> number of external calls as much as possible. Earlier we were using HBASE
> to
> store the incoming data, but we now want to try out stateful operations on
> top of Flink.
>
> In that aspect, we have fixed that we need to have a sliding window of size
> 180 days with a slide Interval of 1 day each such that we keep a state of
> 180 days at any given time. This state would at max be around 40-50 GB for
> the 180 days so we thought of using RocksDB for state storage.
>
> Now the flow of job we are thinking would be incoming events and some extra
> information:
>
> events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
> UpdatedTxnState());
>
> where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
> looks something like this :
>
>
> public class UpdatedTxnState extends RichFlatMapFunction<Tuple3&lt;String,
> List&lt;String>, EventType>, Tuple2<String, EventType>> {
>
>   private ValueState<Tuple3&lt;EventType, List&lt;String>, String>>
> txnState;
>
>   @Override
>   public void open(Configuration config) throws Exception {
>     // Reducing state that keeps a sum
>     ValueStateDescriptor<Tuple3&lt;EventType, List&lt;String>, String>>
> stateDescriptor = new ValueStateDescriptor<>(
>             "transaction", TypeInformation.of(new
> TypeHint<Tuple3&lt;EventType, List&lt;String>, String>>() {
>     }));
>
>     stateDescriptor.setQueryable("transaction");
>
>     this.txnState = getRuntimeContext().getState(stateDescriptor);
>   }
>
>   @Override
>   public void flatMap(Tuple3<String, List&lt;String>, EventType> input,
>                       Collector<Tuple2&lt;String, EventType>> output)
> throws
> Exception {
>
>
>     txnState.update(new Tuple3<>(input._3(),input._2(),input._1());
>
>     output.collect(new Tuple2<>(input._1(),input._3()));
>   }
> }
>
>
> now, I have a couple of questions :
> 1. how can I create a sliding window on top of this state? I can think of
> doing a keyby on the output of flatmap but for me doesn't really make much
> sense and I didn't really find a way to build a state after windowing.
> 2. Can I query the state with the state name I defined here "transaction"
> anywhere in my job?
>
> Thanks,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>