You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vinay patil <vi...@gmail.com> on 2016/09/01 14:13:01 UTC

Re: Streaming - memory management

Hi Fabian/Stephan,

Waiting for your suggestion

Regards,
Vinay Patil

On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <vi...@gmail.com>
wrote:

> Hi Fabian/Stephan,
>
> This makes things clear.
>
> This is the use case I have :
> I am performing a outer join operation on the two streams (in window)
> after which I get matchingAndNonMatchingStream, now I want to make sure
> that the matching rate is high (matching cannot happen if one of the source
> is not emitting elements for certain time) , so to tackle this situation I
> was thinking of using RocksDB as a state Backend, where I will insert the
> unmatched records in it (key - will be same as used for window and value
> will be DTO ), so before inserting into it I will check if it is already
> present in RocksDB, if yes I will take the data from it and send it
> downstream (and ensure I perform the clean operation for that key).
> (Also the data to store should be encrypted, encryption part can be
> handled )
>
> so instead of using Cassandra , Can I do this using RocksDB as state
> backend since the state is not gone after checkpointing ?
>
> P.S I have kept the watermark behind by 1500 secs just to be safe on
> handling late elements but to tackle edge case scenarios like the one
> mentioned above we are having a backup plan of using Cassandra as external
> store since we are dealing with financial critical data.
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vinaj,
>>
>> if you use user-defined state, you have to manually clear it.
>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>> job goes down (planned or due to an OOM error).
>>
>> This is esp. important to keep in mind, when using keyed state.
>> If you have an unbounded, evolving key space you will likely run
>> out-of-memory.
>> The job will constantly add state for each new key but won't be able to
>> clean up the state for "expired" keys.
>>
>> You could implement a clean-up mechanism this if you implement a custom
>> stream operator.
>> However this is a very low level interface and requires solid
>> understanding
>> of the internals like timestamps, watermarks and the checkpointing
>> mechanism.
>>
>> The community is currently working on a state expiry feature (state will
>> be
>> discarded if not requested or updated for x minutes).
>>
>> Regarding the second question: Does state remain local after
>> checkpointing?
>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>> remains in the operator. So the state is not gone after a checkpoint is
>> completed.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <vi...@gmail.com>:
>>
>> > Hi Stephan,
>> >
>> > Just wanted to jump into this discussion regarding state.
>> >
>> > So do you mean that if we maintain user-defined state (for non-window
>> > operators), then if we do  not clear it explicitly will the data for
>> that
>> > key remains in RocksDB.
>> >
>> > What happens in case of checkpoint ? I read in the documentation that
>> after
>> > the checkpoint happens the rocksDB data is pushed to the desired
>> location
>> > (hdfs or s3 or other fs), so for user-defined state does the data still
>> > remain in RocksDB after checkpoint ?
>> >
>> > Correct me if I have misunderstood this concept
>> >
>> > For one of our use we were going for this, but since I read the above
>> part
>> > in documentation so we are going for Cassandra now (to store records and
>> > query them for a special case)
>> >
>> >
>> >
>> >
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <se...@apache.org> wrote:
>> >
>> > > In streaming, memory is mainly needed for state (key/value state). The
>> > > exact representation depends on the chosen StateBackend.
>> > >
>> > > State is explicitly released: For windows, state is cleaned up
>> > > automatically (firing / expiry), for user-defined state, keys have to
>> be
>> > > explicitly cleared (clear() method) or in the future will have the
>> option
>> > > to expire.
>> > >
>> > > The heavy work horse for streaming state is currently RocksDB, which
>> > > internally uses native (off-heap) memory to keep the data.
>> > >
>> > > Does that help?
>> > >
>> > > Stephan
>> > >
>> > >
>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <roshan@hortonworks.com
>> >
>> > > wrote:
>> > >
>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>> avoided by
>> > > > storing messages being processed in ByteBuffers via Unsafe methods.
>> > > >
>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>> So...
>> > > >
>> > > > - Am wondering if this is also the case with Streaming ?
>> > > >
>> > > > - If so, how does Flink detect that an object is no longer being
>> used
>> > and
>> > > > can be reclaimed for reuse once again ?
>> > > >
>> > > > -roshan
>> > > >
>> > >
>> >
>>
>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Streaming - memory management

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for the pointer and sorry for the late answer.
I guess that depends on the semantics of "checkpointing". In Flink's
terminology this means creating a copy of the state (and writing the copy
to the external FS). It does not mean that the state is migrated or moved
to the external FS.

Best, Fabian

2016-09-01 20:53 GMT+02:00 vinay patil <vi...@gmail.com>:

> Hi Fabian,
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends.
> html#the-rocksdbstatebackend
>
> I am referring to this, this does not clearly state if the state will be
> maintained in local disk even after checkpointing.
>
> Or I am not getting it correclty :)
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8849&i=0>> wrote:
>
>> You do not have to convert your DTO into a JSON object to use it as a
>> key-value state in a Flink function.
>> You can pass it as it is via the state interfaces.
>>
>> Can you point me to the documentation that you find confusing? The state
>> documentation [1] says:
>>
>> >> You can make *every* transformation (map, filter, etc) stateful by
>> using Flink’s state interface or checkpointing instance fields of your
>> function.
>> >> You can register any instance field as *managed* state by
>> implementing an interface.
>> >> In this case, and also in the case of using Flink’s native state
>> interface, Flink will automatically take consistent snapshots of your state
>> periodically, and restore its value in the case of a failure.
>>
>> Is that unclear/confusing or are you referring to different paragraph?
>>
>> Thanks, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state.html
>>
>> 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>:
>>
>>> I don't to join the third stream.
>>>
>>> And Yes, This is what I was thinking of.also :
>>> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>> backup join)
>>>
>>>
>>> I am already done integrating with Cassandra but I feel RocksDB will be
>>> a better option, I will have to take care of the clearing part as you have
>>> suggested, will check that in documentation.
>>>
>>> I have the DTO with almost 50 fields , converting it to JSON and storing
>>> it as a state should not be a problem , or there is no harm in storing the
>>> DTO ?
>>>
>>> I think the documentation should specify the point that the state will
>>> be maintained for user-defined operators to avoid confusion.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>>>
>>>> I thought you would like to join the non-matched elements with another
>>>> (third) stream.
>>>>
>>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(
>>>> s3.keyBy).coFlatMap(// backup join)
>>>>
>>>> If you want to match the non-matched stream with itself a
>>>> FlatMapFunction is the right choice.
>>>>
>>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>>> backup join)
>>>>
>>>> The backup join puts all non-match elements in the state and waits for
>>>> another non-matched element with the same key to do the join.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>>>
>>>>> Yes, that's what I am looking for.
>>>>>
>>>>> But why to use CoFlatMapFunction , I have already got the
>>>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>>>> having the logic in apply method for performing outer-join.
>>>>>
>>>>> I am thinking of applying the same key on matchingAndNonMatching and
>>>>> flatmap to take care of rest logic.
>>>>>
>>>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>>>>> operation  (I mean after doing the window and
>>>>> getting matchingAndNonMatching stream )?
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink
>>>>> User Mailing List archive.] <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>>>>
>>>>>> Thanks for the explanation. I think I understood your usecase.
>>>>>>
>>>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a
>>>>>> keyed stream (keyed by join key).
>>>>>> One input would be the unmatched outer join records, the other input
>>>>>> would serve the events you want to match them with.
>>>>>> Retrieving elements from RocksDB will be local and should be fast.
>>>>>>
>>>>>> You should be confident though, that all unmatched record will be
>>>>>> picked up at some point (RocksDB persists to disk, so you won't run out of
>>>>>> memory but snapshots size will increase).
>>>>>> The future state expiry feature will avoid such situations.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> I had already used Co-Group function earlier but were getting some
>>>>>>> issues while dealing with watermarks (for one use case I was not getting
>>>>>>> the correct result), so I have used the union operator for performing the
>>>>>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>>>>>> correctly and giving me correct results.
>>>>>>>
>>>>>>> As I have discussed the scenario, I want to maintain the
>>>>>>> non-matching records in some store, so that's why I was thinking of using
>>>>>>> RocksDB as a store here, where I will maintain the user-defined state
>>>>>>>  after the outer-join window operator, and I can query it using Flink to
>>>>>>> check if the value for a particular key is present or not , if present I
>>>>>>> can match them and send it downstream.
>>>>>>>
>>>>>>> The final goal is to have zero non-matching records, so this is the
>>>>>>> backup plan to handle edge-case scenarios.
>>>>>>>
>>>>>>> I have already integrated code to write to Cassandra using Flink
>>>>>>> Connector, but I think this will be a better option rather than hitting the
>>>>>>> query to external store since RocksDb will store the data to local TM disk,
>>>>>>> the retrieval will be faster here than Cassandra , right ?
>>>>>>>
>>>>>>> What do you think ?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Vinay Patil
>>>>>>>
>>>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink
>>>>>>> User Mailing List archive.] <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>>>>>
>>>>>>>> Hi Vinay,
>>>>>>>>
>>>>>>>> can you give a bit more detail about how you plan to implement the
>>>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>>>>>
>>>>>>>> An alternative could be to use a CoGroup operator which collects
>>>>>>>> from two inputs all elements that share a common key (the join key) and are
>>>>>>>> in the same window. The interface of the function provides two iterators
>>>>>>>> over the elements of both inputs and can be used to implement outer join
>>>>>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>>>>>> not have to take care of state handling at all.
>>>>>>>>
>>>>>>>> In case you go for a custom implementation you will need to work
>>>>>>>> with operator state.
>>>>>>>> However, you do not need to directly interact with RocksDB. Flink
>>>>>>>> is taking care of that for you.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>>>>>
>>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>>
>>>>>>>>> Waiting for your suggestion
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Vinay Patil
>>>>>>>>>
>>>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>>>
>>>>>>>>>> This makes things clear.
>>>>>>>>>>
>>>>>>>>>> This is the use case I have :
>>>>>>>>>> I am performing a outer join operation on the two streams (in
>>>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to make
>>>>>>>>>> sure that the matching rate is high (matching cannot happen if one of the
>>>>>>>>>> source is not emitting elements for certain time) , so to tackle this
>>>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I will
>>>>>>>>>> insert the unmatched records in it (key - will be same as used for window
>>>>>>>>>> and value will be DTO ), so before inserting into it I will check if it is
>>>>>>>>>> already present in RocksDB, if yes I will take the data from it and send it
>>>>>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>>>>>> (Also the data to store should be encrypted, encryption part can
>>>>>>>>>> be handled )
>>>>>>>>>>
>>>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as
>>>>>>>>>> state backend since the state is not gone after checkpointing ?
>>>>>>>>>>
>>>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe
>>>>>>>>>> on handling late elements but to tackle edge case scenarios like the one
>>>>>>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>>>>>>> store since we are dealing with financial critical data.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Vinay Patil
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vinaj,
>>>>>>>>>>>
>>>>>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB)
>>>>>>>>>>> until the
>>>>>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>>>>>
>>>>>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>>>>>> out-of-memory.
>>>>>>>>>>> The job will constantly add state for each new key but won't be
>>>>>>>>>>> able to
>>>>>>>>>>> clean up the state for "expired" keys.
>>>>>>>>>>>
>>>>>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>>>>>> custom
>>>>>>>>>>> stream operator.
>>>>>>>>>>> However this is a very low level interface and requires solid
>>>>>>>>>>> understanding
>>>>>>>>>>> of the internals like timestamps, watermarks and the
>>>>>>>>>>> checkpointing
>>>>>>>>>>> mechanism.
>>>>>>>>>>>
>>>>>>>>>>> The community is currently working on a state expiry feature
>>>>>>>>>>> (state will be
>>>>>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>>>>>
>>>>>>>>>>> Regarding the second question: Does state remain local after
>>>>>>>>>>> checkpointing?
>>>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3,
>>>>>>>>>>> ...) but
>>>>>>>>>>> remains in the operator. So the state is not gone after a
>>>>>>>>>>> checkpoint is
>>>>>>>>>>> completed.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helps,
>>>>>>>>>>> Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>>>>>
>>>>>>>>>>> > Hi Stephan,
>>>>>>>>>>> >
>>>>>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>>>>>> >
>>>>>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>>>>>> non-window
>>>>>>>>>>> > operators), then if we do  not clear it explicitly will the
>>>>>>>>>>> data for that
>>>>>>>>>>> > key remains in RocksDB.
>>>>>>>>>>> >
>>>>>>>>>>> > What happens in case of checkpoint ? I read in the
>>>>>>>>>>> documentation that after
>>>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the
>>>>>>>>>>> desired location
>>>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the
>>>>>>>>>>> data still
>>>>>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>>>>>> >
>>>>>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>>>>>> >
>>>>>>>>>>> > For one of our use we were going for this, but since I read
>>>>>>>>>>> the above part
>>>>>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>>>>>> records and
>>>>>>>>>>> > query them for a special case)
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > Regards,
>>>>>>>>>>> > Vinay Patil
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>>>>>> state). The
>>>>>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>>>>>> > >
>>>>>>>>>>> > > State is explicitly released: For windows, state is cleaned
>>>>>>>>>>> up
>>>>>>>>>>> > > automatically (firing / expiry), for user-defined state,
>>>>>>>>>>> keys have to be
>>>>>>>>>>> > > explicitly cleared (clear() method) or in the future will
>>>>>>>>>>> have the option
>>>>>>>>>>> > > to expire.
>>>>>>>>>>> > >
>>>>>>>>>>> > > The heavy work horse for streaming state is currently
>>>>>>>>>>> RocksDB, which
>>>>>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>>>>>> > >
>>>>>>>>>>> > > Does that help?
>>>>>>>>>>> > >
>>>>>>>>>>> > > Stephan
>>>>>>>>>>> > >
>>>>>>>>>>> > >
>>>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden
>>>>>>>>>>> email] <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>>>>>> > > wrote:
>>>>>>>>>>> > >
>>>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation
>>>>>>>>>>> is avoided by
>>>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>>>>>> methods.
>>>>>>>>>>> > > >
>>>>>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>>>>>> mode. So...
>>>>>>>>>>> > > >
>>>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>>>>>> > > >
>>>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>>>>>> being used
>>>>>>>>>>> > and
>>>>>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>>>>>> > > >
>>>>>>>>>>> > > > -roshan
>>>>>>>>>>> > > >
>>>>>>>>>>> > >
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>> View this message in context: Re: Streaming - memory management
>>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing
>>>>>>>>> list archive
>>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>>>>> at Nabble.com.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> If you reply to this email, your message will be added to the
>>>>>>>> discussion below:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>>> email [hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>>> here.
>>>>>>>> NAML
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> View this message in context: Re: Streaming - memory management
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>>> archive
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>>> at Nabble.com.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Streaming - memory management
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Streaming-memory-management-tp8829p8843.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Streaming - memory management
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Streaming-memory-management-tp8829p8847.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=8849&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8849.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Streaming - memory management

Posted by vinay patil <vi...@gmail.com>.
Hi Fabian,

https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends.html#the-rocksdbstatebackend

I am referring to this, this does not clearly state if the state will be
maintained in local disk even after checkpointing.

Or I am not getting it correclty :)

Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> You do not have to convert your DTO into a JSON object to use it as a
> key-value state in a Flink function.
> You can pass it as it is via the state interfaces.
>
> Can you point me to the documentation that you find confusing? The state
> documentation [1] says:
>
> >> You can make *every* transformation (map, filter, etc) stateful by
> using Flink’s state interface or checkpointing instance fields of your
> function.
> >> You can register any instance field as *managed* state by implementing
> an interface.
> >> In this case, and also in the case of using Flink’s native state
> interface, Flink will automatically take consistent snapshots of your state
> periodically, and restore its value in the case of a failure.
>
> Is that unclear/confusing or are you referring to different paragraph?
>
> Thanks, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html
>
> 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>:
>
>> I don't to join the third stream.
>>
>> And Yes, This is what I was thinking of.also :
>> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>> backup join)
>>
>>
>> I am already done integrating with Cassandra but I feel RocksDB will be a
>> better option, I will have to take care of the clearing part as you have
>> suggested, will check that in documentation.
>>
>> I have the DTO with almost 50 fields , converting it to JSON and storing
>> it as a state should not be a problem , or there is no harm in storing the
>> DTO ?
>>
>> I think the documentation should specify the point that the state will be
>> maintained for user-defined operators to avoid confusion.
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>>
>>> I thought you would like to join the non-matched elements with another
>>> (third) stream.
>>>
>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(
>>> s3.keyBy).coFlatMap(// backup join)
>>>
>>> If you want to match the non-matched stream with itself a
>>> FlatMapFunction is the right choice.
>>>
>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>> backup join)
>>>
>>> The backup join puts all non-match elements in the state and waits for
>>> another non-matched element with the same key to do the join.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>>
>>>> Yes, that's what I am looking for.
>>>>
>>>> But why to use CoFlatMapFunction , I have already got the
>>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>>> having the logic in apply method for performing outer-join.
>>>>
>>>> I am thinking of applying the same key on matchingAndNonMatching and
>>>> flatmap to take care of rest logic.
>>>>
>>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>>>> operation  (I mean after doing the window and
>>>> getting matchingAndNonMatching stream )?
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>>>
>>>>> Thanks for the explanation. I think I understood your usecase.
>>>>>
>>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>>>>> stream (keyed by join key).
>>>>> One input would be the unmatched outer join records, the other input
>>>>> would serve the events you want to match them with.
>>>>> Retrieving elements from RocksDB will be local and should be fast.
>>>>>
>>>>> You should be confident though, that all unmatched record will be
>>>>> picked up at some point (RocksDB persists to disk, so you won't run out of
>>>>> memory but snapshots size will increase).
>>>>> The future state expiry feature will avoid such situations.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> I had already used Co-Group function earlier but were getting some
>>>>>> issues while dealing with watermarks (for one use case I was not getting
>>>>>> the correct result), so I have used the union operator for performing the
>>>>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>>>>> correctly and giving me correct results.
>>>>>>
>>>>>> As I have discussed the scenario, I want to maintain the non-matching
>>>>>> records in some store, so that's why I was thinking of using RocksDB as a
>>>>>> store here, where I will maintain the user-defined state  after the
>>>>>> outer-join window operator, and I can query it using Flink to check if the
>>>>>> value for a particular key is present or not , if present I can match them
>>>>>> and send it downstream.
>>>>>>
>>>>>> The final goal is to have zero non-matching records, so this is the
>>>>>> backup plan to handle edge-case scenarios.
>>>>>>
>>>>>> I have already integrated code to write to Cassandra using Flink
>>>>>> Connector, but I think this will be a better option rather than hitting the
>>>>>> query to external store since RocksDb will store the data to local TM disk,
>>>>>> the retrieval will be faster here than Cassandra , right ?
>>>>>>
>>>>>> What do you think ?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink
>>>>>> User Mailing List archive.] <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>>>>
>>>>>>> Hi Vinay,
>>>>>>>
>>>>>>> can you give a bit more detail about how you plan to implement the
>>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>>>>
>>>>>>> An alternative could be to use a CoGroup operator which collects
>>>>>>> from two inputs all elements that share a common key (the join key) and are
>>>>>>> in the same window. The interface of the function provides two iterators
>>>>>>> over the elements of both inputs and can be used to implement outer join
>>>>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>>>>> not have to take care of state handling at all.
>>>>>>>
>>>>>>> In case you go for a custom implementation you will need to work
>>>>>>> with operator state.
>>>>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>>>>> taking care of that for you.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>>>>
>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>
>>>>>>>> Waiting for your suggestion
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Vinay Patil
>>>>>>>>
>>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>>>>
>>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>>
>>>>>>>>> This makes things clear.
>>>>>>>>>
>>>>>>>>> This is the use case I have :
>>>>>>>>> I am performing a outer join operation on the two streams (in
>>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to make
>>>>>>>>> sure that the matching rate is high (matching cannot happen if one of the
>>>>>>>>> source is not emitting elements for certain time) , so to tackle this
>>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I will
>>>>>>>>> insert the unmatched records in it (key - will be same as used for window
>>>>>>>>> and value will be DTO ), so before inserting into it I will check if it is
>>>>>>>>> already present in RocksDB, if yes I will take the data from it and send it
>>>>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>>>>> (Also the data to store should be encrypted, encryption part can
>>>>>>>>> be handled )
>>>>>>>>>
>>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as
>>>>>>>>> state backend since the state is not gone after checkpointing ?
>>>>>>>>>
>>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe
>>>>>>>>> on handling late elements but to tackle edge case scenarios like the one
>>>>>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>>>>>> store since we are dealing with financial critical data.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Vinay Patil
>>>>>>>>>
>>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vinaj,
>>>>>>>>>>
>>>>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB)
>>>>>>>>>> until the
>>>>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>>>>
>>>>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>>>>> out-of-memory.
>>>>>>>>>> The job will constantly add state for each new key but won't be
>>>>>>>>>> able to
>>>>>>>>>> clean up the state for "expired" keys.
>>>>>>>>>>
>>>>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>>>>> custom
>>>>>>>>>> stream operator.
>>>>>>>>>> However this is a very low level interface and requires solid
>>>>>>>>>> understanding
>>>>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>>>>> mechanism.
>>>>>>>>>>
>>>>>>>>>> The community is currently working on a state expiry feature
>>>>>>>>>> (state will be
>>>>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>>>>
>>>>>>>>>> Regarding the second question: Does state remain local after
>>>>>>>>>> checkpointing?
>>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3,
>>>>>>>>>> ...) but
>>>>>>>>>> remains in the operator. So the state is not gone after a
>>>>>>>>>> checkpoint is
>>>>>>>>>> completed.
>>>>>>>>>>
>>>>>>>>>> Hope this helps,
>>>>>>>>>> Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>>>>
>>>>>>>>>> > Hi Stephan,
>>>>>>>>>> >
>>>>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>>>>> >
>>>>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>>>>> non-window
>>>>>>>>>> > operators), then if we do  not clear it explicitly will the
>>>>>>>>>> data for that
>>>>>>>>>> > key remains in RocksDB.
>>>>>>>>>> >
>>>>>>>>>> > What happens in case of checkpoint ? I read in the
>>>>>>>>>> documentation that after
>>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the
>>>>>>>>>> desired location
>>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the
>>>>>>>>>> data still
>>>>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>>>>> >
>>>>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>>>>> >
>>>>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>>>>> above part
>>>>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>>>>> records and
>>>>>>>>>> > query them for a special case)
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Regards,
>>>>>>>>>> > Vinay Patil
>>>>>>>>>> >
>>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>>>>> state). The
>>>>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>>>>> > >
>>>>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>>>>> have to be
>>>>>>>>>> > > explicitly cleared (clear() method) or in the future will
>>>>>>>>>> have the option
>>>>>>>>>> > > to expire.
>>>>>>>>>> > >
>>>>>>>>>> > > The heavy work horse for streaming state is currently
>>>>>>>>>> RocksDB, which
>>>>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>>>>> > >
>>>>>>>>>> > > Does that help?
>>>>>>>>>> > >
>>>>>>>>>> > > Stephan
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>>>>> > > wrote:
>>>>>>>>>> > >
>>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation
>>>>>>>>>> is avoided by
>>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>>>>> methods.
>>>>>>>>>> > > >
>>>>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>>>>> mode. So...
>>>>>>>>>> > > >
>>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>>>>> > > >
>>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>>>>> being used
>>>>>>>>>> > and
>>>>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>>>>> > > >
>>>>>>>>>> > > > -roshan
>>>>>>>>>> > > >
>>>>>>>>>> > >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> View this message in context: Re: Streaming - memory management
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>>>> archive
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>>>> at Nabble.com.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here.
>>>>>>> NAML
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> View this message in context: Re: Streaming - memory management
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Streaming - memory management
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Re-Streaming-memory-management-tp8829p8843.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8847.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8849.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Streaming - memory management

Posted by Fabian Hueske <fh...@gmail.com>.
You do not have to convert your DTO into a JSON object to use it as a
key-value state in a Flink function.
You can pass it as it is via the state interfaces.

Can you point me to the documentation that you find confusing? The state
documentation [1] says:

>> You can make *every* transformation (map, filter, etc) stateful by using
Flink’s state interface or checkpointing instance fields of your function.
>> You can register any instance field as *managed* state by implementing
an interface.
>> In this case, and also in the case of using Flink’s native state
interface, Flink will automatically take consistent snapshots of your state
periodically, and restore its value in the case of a failure.

Is that unclear/confusing or are you referring to different paragraph?

Thanks, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html

2016-09-01 20:22 GMT+02:00 vinay patil <vi...@gmail.com>:

> I don't to join the third stream.
>
> And Yes, This is what I was thinking of.also :
> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// backup
> join)
>
>
> I am already done integrating with Cassandra but I feel RocksDB will be a
> better option, I will have to take care of the clearing part as you have
> suggested, will check that in documentation.
>
> I have the DTO with almost 50 fields , converting it to JSON and storing
> it as a state should not be a problem , or there is no harm in storing the
> DTO ?
>
> I think the documentation should specify the point that the state will be
> maintained for user-defined operators to avoid confusion.
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>
>> I thought you would like to join the non-matched elements with another
>> (third) stream.
>>
>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(s3.keyBy).coFlatMap(//
>> backup join)
>>
>> If you want to match the non-matched stream with itself a FlatMapFunction
>> is the right choice.
>>
>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>> backup join)
>>
>> The backup join puts all non-match elements in the state and waits for
>> another non-matched element with the same key to do the join.
>>
>> Best, Fabian
>>
>>
>>
>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>
>>> Yes, that's what I am looking for.
>>>
>>> But why to use CoFlatMapFunction , I have already got the
>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>> having the logic in apply method for performing outer-join.
>>>
>>> I am thinking of applying the same key on matchingAndNonMatching and
>>> flatmap to take care of rest logic.
>>>
>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>>> operation  (I mean after doing the window and
>>> getting matchingAndNonMatching stream )?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>>
>>>> Thanks for the explanation. I think I understood your usecase.
>>>>
>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>>>> stream (keyed by join key).
>>>> One input would be the unmatched outer join records, the other input
>>>> would serve the events you want to match them with.
>>>> Retrieving elements from RocksDB will be local and should be fast.
>>>>
>>>> You should be confident though, that all unmatched record will be
>>>> picked up at some point (RocksDB persists to disk, so you won't run out of
>>>> memory but snapshots size will increase).
>>>> The future state expiry feature will avoid such situations.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I had already used Co-Group function earlier but were getting some
>>>>> issues while dealing with watermarks (for one use case I was not getting
>>>>> the correct result), so I have used the union operator for performing the
>>>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>>>> correctly and giving me correct results.
>>>>>
>>>>> As I have discussed the scenario, I want to maintain the non-matching
>>>>> records in some store, so that's why I was thinking of using RocksDB as a
>>>>> store here, where I will maintain the user-defined state  after the
>>>>> outer-join window operator, and I can query it using Flink to check if the
>>>>> value for a particular key is present or not , if present I can match them
>>>>> and send it downstream.
>>>>>
>>>>> The final goal is to have zero non-matching records, so this is the
>>>>> backup plan to handle edge-case scenarios.
>>>>>
>>>>> I have already integrated code to write to Cassandra using Flink
>>>>> Connector, but I think this will be a better option rather than hitting the
>>>>> query to external store since RocksDb will store the data to local TM disk,
>>>>> the retrieval will be faster here than Cassandra , right ?
>>>>>
>>>>> What do you think ?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink
>>>>> User Mailing List archive.] <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>>>
>>>>>> Hi Vinay,
>>>>>>
>>>>>> can you give a bit more detail about how you plan to implement the
>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>>>
>>>>>> An alternative could be to use a CoGroup operator which collects from
>>>>>> two inputs all elements that share a common key (the join key) and are in
>>>>>> the same window. The interface of the function provides two iterators over
>>>>>> the elements of both inputs and can be used to implement outer join
>>>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>>>> not have to take care of state handling at all.
>>>>>>
>>>>>> In case you go for a custom implementation you will need to work with
>>>>>> operator state.
>>>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>>>> taking care of that for you.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>>>
>>>>>>> Hi Fabian/Stephan,
>>>>>>>
>>>>>>> Waiting for your suggestion
>>>>>>>
>>>>>>> Regards,
>>>>>>> Vinay Patil
>>>>>>>
>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>>>
>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>
>>>>>>>> This makes things clear.
>>>>>>>>
>>>>>>>> This is the use case I have :
>>>>>>>> I am performing a outer join operation on the two streams (in
>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to make
>>>>>>>> sure that the matching rate is high (matching cannot happen if one of the
>>>>>>>> source is not emitting elements for certain time) , so to tackle this
>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I will
>>>>>>>> insert the unmatched records in it (key - will be same as used for window
>>>>>>>> and value will be DTO ), so before inserting into it I will check if it is
>>>>>>>> already present in RocksDB, if yes I will take the data from it and send it
>>>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>>>> (Also the data to store should be encrypted, encryption part can be
>>>>>>>> handled )
>>>>>>>>
>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as
>>>>>>>> state backend since the state is not gone after checkpointing ?
>>>>>>>>
>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe
>>>>>>>> on handling late elements but to tackle edge case scenarios like the one
>>>>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>>>>> store since we are dealing with financial critical data.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Vinay Patil
>>>>>>>>
>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vinaj,
>>>>>>>>>
>>>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB)
>>>>>>>>> until the
>>>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>>>
>>>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>>>> out-of-memory.
>>>>>>>>> The job will constantly add state for each new key but won't be
>>>>>>>>> able to
>>>>>>>>> clean up the state for "expired" keys.
>>>>>>>>>
>>>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>>>> custom
>>>>>>>>> stream operator.
>>>>>>>>> However this is a very low level interface and requires solid
>>>>>>>>> understanding
>>>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>>>> mechanism.
>>>>>>>>>
>>>>>>>>> The community is currently working on a state expiry feature
>>>>>>>>> (state will be
>>>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>>>
>>>>>>>>> Regarding the second question: Does state remain local after
>>>>>>>>> checkpointing?
>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3,
>>>>>>>>> ...) but
>>>>>>>>> remains in the operator. So the state is not gone after a
>>>>>>>>> checkpoint is
>>>>>>>>> completed.
>>>>>>>>>
>>>>>>>>> Hope this helps,
>>>>>>>>> Fabian
>>>>>>>>>
>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>>>
>>>>>>>>> > Hi Stephan,
>>>>>>>>> >
>>>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>>>> >
>>>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>>>> non-window
>>>>>>>>> > operators), then if we do  not clear it explicitly will the data
>>>>>>>>> for that
>>>>>>>>> > key remains in RocksDB.
>>>>>>>>> >
>>>>>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>>>>>> that after
>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>>>>>> location
>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the
>>>>>>>>> data still
>>>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>>>> >
>>>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>>>> >
>>>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>>>> above part
>>>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>>>> records and
>>>>>>>>> > query them for a special case)
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Regards,
>>>>>>>>> > Vinay Patil
>>>>>>>>> >
>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>>>> >
>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>>>> state). The
>>>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>>>> > >
>>>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>>>> have to be
>>>>>>>>> > > explicitly cleared (clear() method) or in the future will have
>>>>>>>>> the option
>>>>>>>>> > > to expire.
>>>>>>>>> > >
>>>>>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>>>>>> which
>>>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>>>> > >
>>>>>>>>> > > Does that help?
>>>>>>>>> > >
>>>>>>>>> > > Stephan
>>>>>>>>> > >
>>>>>>>>> > >
>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>>>> > > wrote:
>>>>>>>>> > >
>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>>>>>> avoided by
>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>>>> methods.
>>>>>>>>> > > >
>>>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>>>> mode. So...
>>>>>>>>> > > >
>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>>>> > > >
>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>>>> being used
>>>>>>>>> > and
>>>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>>>> > > >
>>>>>>>>> > > > -roshan
>>>>>>>>> > > >
>>>>>>>>> > >
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> View this message in context: Re: Streaming - memory management
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>>> archive
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>>> at Nabble.com.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Streaming - memory management
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Streaming - memory management
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Streaming-memory-management-tp8829p8843.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=8845&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Streaming - memory management

Posted by vinay patil <vi...@gmail.com>.
I don't to join the third stream.

And Yes, This is what I was thinking of.also :
s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// backup
join)


I am already done integrating with Cassandra but I feel RocksDB will be a
better option, I will have to take care of the clearing part as you have
suggested, will check that in documentation.

I have the DTO with almost 50 fields , converting it to JSON and storing it
as a state should not be a problem , or there is no harm in storing the DTO
?

I think the documentation should specify the point that the state will be
maintained for user-defined operators to avoid confusion.

Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> I thought you would like to join the non-matched elements with another
> (third) stream.
>
> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(s3.keyBy).coFlatMap(//
> backup join)
>
> If you want to match the non-matched stream with itself a FlatMapFunction
> is the right choice.
>
> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
> backup join)
>
> The backup join puts all non-match elements in the state and waits for
> another non-matched element with the same key to do the join.
>
> Best, Fabian
>
>
>
> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>
>> Yes, that's what I am looking for.
>>
>> But why to use CoFlatMapFunction , I have already got the
>> matchingAndNonMatching Stream , by doing the union of two streams and
>> having the logic in apply method for performing outer-join.
>>
>> I am thinking of applying the same key on matchingAndNonMatching and
>> flatmap to take care of rest logic.
>>
>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>> operation  (I mean after doing the window and
>> getting matchingAndNonMatching stream )?
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>
>>> Thanks for the explanation. I think I understood your usecase.
>>>
>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>>> stream (keyed by join key).
>>> One input would be the unmatched outer join records, the other input
>>> would serve the events you want to match them with.
>>> Retrieving elements from RocksDB will be local and should be fast.
>>>
>>> You should be confident though, that all unmatched record will be picked
>>> up at some point (RocksDB persists to disk, so you won't run out of memory
>>> but snapshots size will increase).
>>> The future state expiry feature will avoid such situations.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>
>>>> Hi Fabian,
>>>>
>>>> I had already used Co-Group function earlier but were getting some
>>>> issues while dealing with watermarks (for one use case I was not getting
>>>> the correct result), so I have used the union operator for performing the
>>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>>> correctly and giving me correct results.
>>>>
>>>> As I have discussed the scenario, I want to maintain the non-matching
>>>> records in some store, so that's why I was thinking of using RocksDB as a
>>>> store here, where I will maintain the user-defined state  after the
>>>> outer-join window operator, and I can query it using Flink to check if the
>>>> value for a particular key is present or not , if present I can match them
>>>> and send it downstream.
>>>>
>>>> The final goal is to have zero non-matching records, so this is the
>>>> backup plan to handle edge-case scenarios.
>>>>
>>>> I have already integrated code to write to Cassandra using Flink
>>>> Connector, but I think this will be a better option rather than hitting the
>>>> query to external store since RocksDb will store the data to local TM disk,
>>>> the retrieval will be faster here than Cassandra , right ?
>>>>
>>>> What do you think ?
>>>>
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>>
>>>>> Hi Vinay,
>>>>>
>>>>> can you give a bit more detail about how you plan to implement the
>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>>
>>>>> An alternative could be to use a CoGroup operator which collects from
>>>>> two inputs all elements that share a common key (the join key) and are in
>>>>> the same window. The interface of the function provides two iterators over
>>>>> the elements of both inputs and can be used to implement outer join
>>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>>> not have to take care of state handling at all.
>>>>>
>>>>> In case you go for a custom implementation you will need to work with
>>>>> operator state.
>>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>>> taking care of that for you.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>>
>>>>>> Hi Fabian/Stephan,
>>>>>>
>>>>>> Waiting for your suggestion
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>>
>>>>>>> Hi Fabian/Stephan,
>>>>>>>
>>>>>>> This makes things clear.
>>>>>>>
>>>>>>> This is the use case I have :
>>>>>>> I am performing a outer join operation on the two streams (in
>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to make
>>>>>>> sure that the matching rate is high (matching cannot happen if one of the
>>>>>>> source is not emitting elements for certain time) , so to tackle this
>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I will
>>>>>>> insert the unmatched records in it (key - will be same as used for window
>>>>>>> and value will be DTO ), so before inserting into it I will check if it is
>>>>>>> already present in RocksDB, if yes I will take the data from it and send it
>>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>>> (Also the data to store should be encrypted, encryption part can be
>>>>>>> handled )
>>>>>>>
>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>>>>> backend since the state is not gone after checkpointing ?
>>>>>>>
>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>>>>> handling late elements but to tackle edge case scenarios like the one
>>>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>>>> store since we are dealing with financial critical data.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Vinay Patil
>>>>>>>
>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>>
>>>>>>>> Hi Vinaj,
>>>>>>>>
>>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB)
>>>>>>>> until the
>>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>>
>>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>>> out-of-memory.
>>>>>>>> The job will constantly add state for each new key but won't be
>>>>>>>> able to
>>>>>>>> clean up the state for "expired" keys.
>>>>>>>>
>>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>>> custom
>>>>>>>> stream operator.
>>>>>>>> However this is a very low level interface and requires solid
>>>>>>>> understanding
>>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>>> mechanism.
>>>>>>>>
>>>>>>>> The community is currently working on a state expiry feature (state
>>>>>>>> will be
>>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>>
>>>>>>>> Regarding the second question: Does state remain local after
>>>>>>>> checkpointing?
>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3,
>>>>>>>> ...) but
>>>>>>>> remains in the operator. So the state is not gone after a
>>>>>>>> checkpoint is
>>>>>>>> completed.
>>>>>>>>
>>>>>>>> Hope this helps,
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>>
>>>>>>>> > Hi Stephan,
>>>>>>>> >
>>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>>> >
>>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>>> non-window
>>>>>>>> > operators), then if we do  not clear it explicitly will the data
>>>>>>>> for that
>>>>>>>> > key remains in RocksDB.
>>>>>>>> >
>>>>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>>>>> that after
>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>>>>> location
>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>>>>>> still
>>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>>> >
>>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>>> >
>>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>>> above part
>>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>>> records and
>>>>>>>> > query them for a special case)
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Regards,
>>>>>>>> > Vinay Patil
>>>>>>>> >
>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>>> >
>>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>>> state). The
>>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>>> > >
>>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>>> have to be
>>>>>>>> > > explicitly cleared (clear() method) or in the future will have
>>>>>>>> the option
>>>>>>>> > > to expire.
>>>>>>>> > >
>>>>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>>>>> which
>>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>>> > >
>>>>>>>> > > Does that help?
>>>>>>>> > >
>>>>>>>> > > Stephan
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>>> > > wrote:
>>>>>>>> > >
>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>>>>> avoided by
>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>>> methods.
>>>>>>>> > > >
>>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>>> mode. So...
>>>>>>>> > > >
>>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>>> > > >
>>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>>> being used
>>>>>>>> > and
>>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>>> > > >
>>>>>>>> > > > -roshan
>>>>>>>> > > >
>>>>>>>> > >
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> View this message in context: Re: Streaming - memory management
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Streaming - memory management
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8843.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Streaming - memory management

Posted by Fabian Hueske <fh...@gmail.com>.
I thought you would like to join the non-matched elements with another
(third) stream.

--> s1.union(s2).keyBy().window().apply(//
outerjoin).keyBy.connect(s3.keyBy).coFlatMap(// backup join)

If you want to match the non-matched stream with itself a FlatMapFunction
is the right choice.

--> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
backup join)

The backup join puts all non-match elements in the state and waits for
another non-matched element with the same key to do the join.

Best, Fabian



2016-09-01 19:55 GMT+02:00 vinay patil <vi...@gmail.com>:

> Yes, that's what I am looking for.
>
> But why to use CoFlatMapFunction , I have already got the
> matchingAndNonMatching Stream , by doing the union of two streams and
> having the logic in apply method for performing outer-join.
>
> I am thinking of applying the same key on matchingAndNonMatching and
> flatmap to take care of rest logic.
>
> Or are you suggestion to use Co-FlatMapFunction after the outer-join
> operation  (I mean after doing the window and
> getting matchingAndNonMatching stream )?
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>
>> Thanks for the explanation. I think I understood your usecase.
>>
>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>> stream (keyed by join key).
>> One input would be the unmatched outer join records, the other input
>> would serve the events you want to match them with.
>> Retrieving elements from RocksDB will be local and should be fast.
>>
>> You should be confident though, that all unmatched record will be picked
>> up at some point (RocksDB persists to disk, so you won't run out of memory
>> but snapshots size will increase).
>> The future state expiry feature will avoid such situations.
>>
>> Best, Fabian
>>
>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>
>>> Hi Fabian,
>>>
>>> I had already used Co-Group function earlier but were getting some
>>> issues while dealing with watermarks (for one use case I was not getting
>>> the correct result), so I have used the union operator for performing the
>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>> correctly and giving me correct results.
>>>
>>> As I have discussed the scenario, I want to maintain the non-matching
>>> records in some store, so that's why I was thinking of using RocksDB as a
>>> store here, where I will maintain the user-defined state  after the
>>> outer-join window operator, and I can query it using Flink to check if the
>>> value for a particular key is present or not , if present I can match them
>>> and send it downstream.
>>>
>>> The final goal is to have zero non-matching records, so this is the
>>> backup plan to handle edge-case scenarios.
>>>
>>> I have already integrated code to write to Cassandra using Flink
>>> Connector, but I think this will be a better option rather than hitting the
>>> query to external store since RocksDb will store the data to local TM disk,
>>> the retrieval will be faster here than Cassandra , right ?
>>>
>>> What do you think ?
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> can you give a bit more detail about how you plan to implement the
>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>
>>>> An alternative could be to use a CoGroup operator which collects from
>>>> two inputs all elements that share a common key (the join key) and are in
>>>> the same window. The interface of the function provides two iterators over
>>>> the elements of both inputs and can be used to implement outer join
>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>> not have to take care of state handling at all.
>>>>
>>>> In case you go for a custom implementation you will need to work with
>>>> operator state.
>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>> taking care of that for you.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>
>>>>> Hi Fabian/Stephan,
>>>>>
>>>>> Waiting for your suggestion
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>
>>>>>> Hi Fabian/Stephan,
>>>>>>
>>>>>> This makes things clear.
>>>>>>
>>>>>> This is the use case I have :
>>>>>> I am performing a outer join operation on the two streams (in window)
>>>>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>>>>> that the matching rate is high (matching cannot happen if one of the source
>>>>>> is not emitting elements for certain time) , so to tackle this situation I
>>>>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>>>>> unmatched records in it (key - will be same as used for window and value
>>>>>> will be DTO ), so before inserting into it I will check if it is already
>>>>>> present in RocksDB, if yes I will take the data from it and send it
>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>> (Also the data to store should be encrypted, encryption part can be
>>>>>> handled )
>>>>>>
>>>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>>>> backend since the state is not gone after checkpointing ?
>>>>>>
>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>>>> handling late elements but to tackle edge case scenarios like the one
>>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>>> store since we are dealing with financial critical data.
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>
>>>>>>> Hi Vinaj,
>>>>>>>
>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until
>>>>>>> the
>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>
>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>> out-of-memory.
>>>>>>> The job will constantly add state for each new key but won't be able
>>>>>>> to
>>>>>>> clean up the state for "expired" keys.
>>>>>>>
>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>> custom
>>>>>>> stream operator.
>>>>>>> However this is a very low level interface and requires solid
>>>>>>> understanding
>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>> mechanism.
>>>>>>>
>>>>>>> The community is currently working on a state expiry feature (state
>>>>>>> will be
>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>
>>>>>>> Regarding the second question: Does state remain local after
>>>>>>> checkpointing?
>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...)
>>>>>>> but
>>>>>>> remains in the operator. So the state is not gone after a checkpoint
>>>>>>> is
>>>>>>> completed.
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>
>>>>>>> > Hi Stephan,
>>>>>>> >
>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>> >
>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>> non-window
>>>>>>> > operators), then if we do  not clear it explicitly will the data
>>>>>>> for that
>>>>>>> > key remains in RocksDB.
>>>>>>> >
>>>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>>>> that after
>>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>>>> location
>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>>>>> still
>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>> >
>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>> >
>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>> above part
>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>> records and
>>>>>>> > query them for a special case)
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > Regards,
>>>>>>> > Vinay Patil
>>>>>>> >
>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>> >
>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>> state). The
>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>> > >
>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>> have to be
>>>>>>> > > explicitly cleared (clear() method) or in the future will have
>>>>>>> the option
>>>>>>> > > to expire.
>>>>>>> > >
>>>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>>>> which
>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>> > >
>>>>>>> > > Does that help?
>>>>>>> > >
>>>>>>> > > Stephan
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>> > > wrote:
>>>>>>> > >
>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>>>> avoided by
>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>> methods.
>>>>>>> > > >
>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>> mode. So...
>>>>>>> > > >
>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>> > > >
>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>> being used
>>>>>>> > and
>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>> > > >
>>>>>>> > > > -roshan
>>>>>>> > > >
>>>>>>> > >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Streaming - memory management
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Streaming - memory management
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Streaming-memory-management-tp8829p8837.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Streaming - memory management

Posted by vinay patil <vi...@gmail.com>.
Yes, that's what I am looking for.

But why to use CoFlatMapFunction , I have already got the
matchingAndNonMatching Stream , by doing the union of two streams and
having the logic in apply method for performing outer-join.

I am thinking of applying the same key on matchingAndNonMatching and
flatmap to take care of rest logic.

Or are you suggestion to use Co-FlatMapFunction after the outer-join
operation  (I mean after doing the window and
getting matchingAndNonMatching stream )?

Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> Thanks for the explanation. I think I understood your usecase.
>
> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
> stream (keyed by join key).
> One input would be the unmatched outer join records, the other input would
> serve the events you want to match them with.
> Retrieving elements from RocksDB will be local and should be fast.
>
> You should be confident though, that all unmatched record will be picked
> up at some point (RocksDB persists to disk, so you won't run out of memory
> but snapshots size will increase).
> The future state expiry feature will avoid such situations.
>
> Best, Fabian
>
> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>
>> Hi Fabian,
>>
>> I had already used Co-Group function earlier but were getting some issues
>> while dealing with watermarks (for one use case I was not getting the
>> correct result), so I have used the union operator for performing the
>> outer-join (WindowFunction on a keyedStream), this approach is working
>> correctly and giving me correct results.
>>
>> As I have discussed the scenario, I want to maintain the non-matching
>> records in some store, so that's why I was thinking of using RocksDB as a
>> store here, where I will maintain the user-defined state  after the
>> outer-join window operator, and I can query it using Flink to check if the
>> value for a particular key is present or not , if present I can match them
>> and send it downstream.
>>
>> The final goal is to have zero non-matching records, so this is the
>> backup plan to handle edge-case scenarios.
>>
>> I have already integrated code to write to Cassandra using Flink
>> Connector, but I think this will be a better option rather than hitting the
>> query to external store since RocksDb will store the data to local TM disk,
>> the retrieval will be faster here than Cassandra , right ?
>>
>> What do you think ?
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>
>>> Hi Vinay,
>>>
>>> can you give a bit more detail about how you plan to implement the outer
>>> join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>
>>> An alternative could be to use a CoGroup operator which collects from
>>> two inputs all elements that share a common key (the join key) and are in
>>> the same window. The interface of the function provides two iterators over
>>> the elements of both inputs and can be used to implement outer join
>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>> not have to take care of state handling at all.
>>>
>>> In case you go for a custom implementation you will need to work with
>>> operator state.
>>> However, you do not need to directly interact with RocksDB. Flink is
>>> taking care of that for you.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>
>>>> Hi Fabian/Stephan,
>>>>
>>>> Waiting for your suggestion
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>
>>>>> Hi Fabian/Stephan,
>>>>>
>>>>> This makes things clear.
>>>>>
>>>>> This is the use case I have :
>>>>> I am performing a outer join operation on the two streams (in window)
>>>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>>>> that the matching rate is high (matching cannot happen if one of the source
>>>>> is not emitting elements for certain time) , so to tackle this situation I
>>>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>>>> unmatched records in it (key - will be same as used for window and value
>>>>> will be DTO ), so before inserting into it I will check if it is already
>>>>> present in RocksDB, if yes I will take the data from it and send it
>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>> (Also the data to store should be encrypted, encryption part can be
>>>>> handled )
>>>>>
>>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>>> backend since the state is not gone after checkpointing ?
>>>>>
>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>>> handling late elements but to tackle edge case scenarios like the one
>>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>>> store since we are dealing with financial critical data.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>
>>>>>> Hi Vinaj,
>>>>>>
>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until
>>>>>> the
>>>>>> job goes down (planned or due to an OOM error).
>>>>>>
>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>> out-of-memory.
>>>>>> The job will constantly add state for each new key but won't be able
>>>>>> to
>>>>>> clean up the state for "expired" keys.
>>>>>>
>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>> custom
>>>>>> stream operator.
>>>>>> However this is a very low level interface and requires solid
>>>>>> understanding
>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>> mechanism.
>>>>>>
>>>>>> The community is currently working on a state expiry feature (state
>>>>>> will be
>>>>>> discarded if not requested or updated for x minutes).
>>>>>>
>>>>>> Regarding the second question: Does state remain local after
>>>>>> checkpointing?
>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...)
>>>>>> but
>>>>>> remains in the operator. So the state is not gone after a checkpoint
>>>>>> is
>>>>>> completed.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>
>>>>>> > Hi Stephan,
>>>>>> >
>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>> >
>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>> non-window
>>>>>> > operators), then if we do  not clear it explicitly will the data
>>>>>> for that
>>>>>> > key remains in RocksDB.
>>>>>> >
>>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>>> that after
>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>>> location
>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>>>> still
>>>>>> > remain in RocksDB after checkpoint ?
>>>>>> >
>>>>>> > Correct me if I have misunderstood this concept
>>>>>> >
>>>>>> > For one of our use we were going for this, but since I read the
>>>>>> above part
>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>> records and
>>>>>> > query them for a special case)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > Regards,
>>>>>> > Vinay Patil
>>>>>> >
>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>> >
>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>> state). The
>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>> > >
>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>> have to be
>>>>>> > > explicitly cleared (clear() method) or in the future will have
>>>>>> the option
>>>>>> > > to expire.
>>>>>> > >
>>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>>> which
>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>> > >
>>>>>> > > Does that help?
>>>>>> > >
>>>>>> > > Stephan
>>>>>> > >
>>>>>> > >
>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>> > > wrote:
>>>>>> > >
>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>>> avoided by
>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>> methods.
>>>>>> > > >
>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>>>>>> So...
>>>>>> > > >
>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>> > > >
>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>> being used
>>>>>> > and
>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>> > > >
>>>>>> > > > -roshan
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Streaming - memory management
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8837.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Streaming - memory management

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for the explanation. I think I understood your usecase.

Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
stream (keyed by join key).
One input would be the unmatched outer join records, the other input would
serve the events you want to match them with.
Retrieving elements from RocksDB will be local and should be fast.

You should be confident though, that all unmatched record will be picked up
at some point (RocksDB persists to disk, so you won't run out of memory but
snapshots size will increase).
The future state expiry feature will avoid such situations.

Best, Fabian

2016-09-01 18:29 GMT+02:00 vinay patil <vi...@gmail.com>:

> Hi Fabian,
>
> I had already used Co-Group function earlier but were getting some issues
> while dealing with watermarks (for one use case I was not getting the
> correct result), so I have used the union operator for performing the
> outer-join (WindowFunction on a keyedStream), this approach is working
> correctly and giving me correct results.
>
> As I have discussed the scenario, I want to maintain the non-matching
> records in some store, so that's why I was thinking of using RocksDB as a
> store here, where I will maintain the user-defined state  after the
> outer-join window operator, and I can query it using Flink to check if the
> value for a particular key is present or not , if present I can match them
> and send it downstream.
>
> The final goal is to have zero non-matching records, so this is the backup
> plan to handle edge-case scenarios.
>
> I have already integrated code to write to Cassandra using Flink
> Connector, but I think this will be a better option rather than hitting the
> query to external store since RocksDb will store the data to local TM disk,
> the retrieval will be faster here than Cassandra , right ?
>
> What do you think ?
>
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>
>> Hi Vinay,
>>
>> can you give a bit more detail about how you plan to implement the outer
>> join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>
>> An alternative could be to use a CoGroup operator which collects from two
>> inputs all elements that share a common key (the join key) and are in the
>> same window. The interface of the function provides two iterators over the
>> elements of both inputs and can be used to implement outer join
>> functionality. The benefit of working with a CoGroupFunction is that you do
>> not have to take care of state handling at all.
>>
>> In case you go for a custom implementation you will need to work with
>> operator state.
>> However, you do not need to directly interact with RocksDB. Flink is
>> taking care of that for you.
>>
>> Best, Fabian
>>
>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>
>>> Hi Fabian/Stephan,
>>>
>>> Waiting for your suggestion
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>
>>>> Hi Fabian/Stephan,
>>>>
>>>> This makes things clear.
>>>>
>>>> This is the use case I have :
>>>> I am performing a outer join operation on the two streams (in window)
>>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>>> that the matching rate is high (matching cannot happen if one of the source
>>>> is not emitting elements for certain time) , so to tackle this situation I
>>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>>> unmatched records in it (key - will be same as used for window and value
>>>> will be DTO ), so before inserting into it I will check if it is already
>>>> present in RocksDB, if yes I will take the data from it and send it
>>>> downstream (and ensure I perform the clean operation for that key).
>>>> (Also the data to store should be encrypted, encryption part can be
>>>> handled )
>>>>
>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>> backend since the state is not gone after checkpointing ?
>>>>
>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>> handling late elements but to tackle edge case scenarios like the one
>>>> mentioned above we are having a backup plan of using Cassandra as external
>>>> store since we are dealing with financial critical data.
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>
>>>>> Hi Vinaj,
>>>>>
>>>>> if you use user-defined state, you have to manually clear it.
>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until
>>>>> the
>>>>> job goes down (planned or due to an OOM error).
>>>>>
>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>> If you have an unbounded, evolving key space you will likely run
>>>>> out-of-memory.
>>>>> The job will constantly add state for each new key but won't be able to
>>>>> clean up the state for "expired" keys.
>>>>>
>>>>> You could implement a clean-up mechanism this if you implement a custom
>>>>> stream operator.
>>>>> However this is a very low level interface and requires solid
>>>>> understanding
>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>> mechanism.
>>>>>
>>>>> The community is currently working on a state expiry feature (state
>>>>> will be
>>>>> discarded if not requested or updated for x minutes).
>>>>>
>>>>> Regarding the second question: Does state remain local after
>>>>> checkpointing?
>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...)
>>>>> but
>>>>> remains in the operator. So the state is not gone after a checkpoint is
>>>>> completed.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>
>>>>> > Hi Stephan,
>>>>> >
>>>>> > Just wanted to jump into this discussion regarding state.
>>>>> >
>>>>> > So do you mean that if we maintain user-defined state (for non-window
>>>>> > operators), then if we do  not clear it explicitly will the data for
>>>>> that
>>>>> > key remains in RocksDB.
>>>>> >
>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>> that after
>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>> location
>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>>> still
>>>>> > remain in RocksDB after checkpoint ?
>>>>> >
>>>>> > Correct me if I have misunderstood this concept
>>>>> >
>>>>> > For one of our use we were going for this, but since I read the
>>>>> above part
>>>>> > in documentation so we are going for Cassandra now (to store records
>>>>> and
>>>>> > query them for a special case)
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > Regards,
>>>>> > Vinay Patil
>>>>> >
>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>> >
>>>>> > > In streaming, memory is mainly needed for state (key/value state).
>>>>> The
>>>>> > > exact representation depends on the chosen StateBackend.
>>>>> > >
>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>> > > automatically (firing / expiry), for user-defined state, keys have
>>>>> to be
>>>>> > > explicitly cleared (clear() method) or in the future will have the
>>>>> option
>>>>> > > to expire.
>>>>> > >
>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>> which
>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>> > >
>>>>> > > Does that help?
>>>>> > >
>>>>> > > Stephan
>>>>> > >
>>>>> > >
>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>> > > wrote:
>>>>> > >
>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>> avoided by
>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>> methods.
>>>>> > > >
>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>>>>> So...
>>>>> > > >
>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>> > > >
>>>>> > > > - If so, how does Flink detect that an object is no longer being
>>>>> used
>>>>> > and
>>>>> > > > can be reclaimed for reuse once again ?
>>>>> > > >
>>>>> > > > -roshan
>>>>> > > >
>>>>> > >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Streaming - memory management
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Streaming-memory-management-tp8829p8832.html
>> To start a new topic under Apache Flink User Mailing List archive., email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Streaming - memory management

Posted by vinay patil <vi...@gmail.com>.
Hi Fabian,

I had already used Co-Group function earlier but were getting some issues
while dealing with watermarks (for one use case I was not getting the
correct result), so I have used the union operator for performing the
outer-join (WindowFunction on a keyedStream), this approach is working
correctly and giving me correct results.

As I have discussed the scenario, I want to maintain the non-matching
records in some store, so that's why I was thinking of using RocksDB as a
store here, where I will maintain the user-defined state  after the
outer-join window operator, and I can query it using Flink to check if the
value for a particular key is present or not , if present I can match them
and send it downstream.

The final goal is to have zero non-matching records, so this is the backup
plan to handle edge-case scenarios.

I have already integrated code to write to Cassandra using Flink Connector,
but I think this will be a better option rather than hitting the query to
external store since RocksDb will store the data to local TM disk, the
retrieval will be faster here than Cassandra , right ?

What do you think ?


Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> Hi Vinay,
>
> can you give a bit more detail about how you plan to implement the outer
> join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>
> An alternative could be to use a CoGroup operator which collects from two
> inputs all elements that share a common key (the join key) and are in the
> same window. The interface of the function provides two iterators over the
> elements of both inputs and can be used to implement outer join
> functionality. The benefit of working with a CoGroupFunction is that you do
> not have to take care of state handling at all.
>
> In case you go for a custom implementation you will need to work with
> operator state.
> However, you do not need to directly interact with RocksDB. Flink is
> taking care of that for you.
>
> Best, Fabian
>
> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>
>> Hi Fabian/Stephan,
>>
>> Waiting for your suggestion
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>
>>> Hi Fabian/Stephan,
>>>
>>> This makes things clear.
>>>
>>> This is the use case I have :
>>> I am performing a outer join operation on the two streams (in window)
>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>> that the matching rate is high (matching cannot happen if one of the source
>>> is not emitting elements for certain time) , so to tackle this situation I
>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>> unmatched records in it (key - will be same as used for window and value
>>> will be DTO ), so before inserting into it I will check if it is already
>>> present in RocksDB, if yes I will take the data from it and send it
>>> downstream (and ensure I perform the clean operation for that key).
>>> (Also the data to store should be encrypted, encryption part can be
>>> handled )
>>>
>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>> backend since the state is not gone after checkpointing ?
>>>
>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>> handling late elements but to tackle edge case scenarios like the one
>>> mentioned above we are having a backup plan of using Cassandra as external
>>> store since we are dealing with financial critical data.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>
>>>> Hi Vinaj,
>>>>
>>>> if you use user-defined state, you have to manually clear it.
>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>>>> job goes down (planned or due to an OOM error).
>>>>
>>>> This is esp. important to keep in mind, when using keyed state.
>>>> If you have an unbounded, evolving key space you will likely run
>>>> out-of-memory.
>>>> The job will constantly add state for each new key but won't be able to
>>>> clean up the state for "expired" keys.
>>>>
>>>> You could implement a clean-up mechanism this if you implement a custom
>>>> stream operator.
>>>> However this is a very low level interface and requires solid
>>>> understanding
>>>> of the internals like timestamps, watermarks and the checkpointing
>>>> mechanism.
>>>>
>>>> The community is currently working on a state expiry feature (state
>>>> will be
>>>> discarded if not requested or updated for x minutes).
>>>>
>>>> Regarding the second question: Does state remain local after
>>>> checkpointing?
>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>>>> remains in the operator. So the state is not gone after a checkpoint is
>>>> completed.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>
>>>> > Hi Stephan,
>>>> >
>>>> > Just wanted to jump into this discussion regarding state.
>>>> >
>>>> > So do you mean that if we maintain user-defined state (for non-window
>>>> > operators), then if we do  not clear it explicitly will the data for
>>>> that
>>>> > key remains in RocksDB.
>>>> >
>>>> > What happens in case of checkpoint ? I read in the documentation that
>>>> after
>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>> location
>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>> still
>>>> > remain in RocksDB after checkpoint ?
>>>> >
>>>> > Correct me if I have misunderstood this concept
>>>> >
>>>> > For one of our use we were going for this, but since I read the above
>>>> part
>>>> > in documentation so we are going for Cassandra now (to store records
>>>> and
>>>> > query them for a special case)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Regards,
>>>> > Vinay Patil
>>>> >
>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>> >
>>>> > > In streaming, memory is mainly needed for state (key/value state).
>>>> The
>>>> > > exact representation depends on the chosen StateBackend.
>>>> > >
>>>> > > State is explicitly released: For windows, state is cleaned up
>>>> > > automatically (firing / expiry), for user-defined state, keys have
>>>> to be
>>>> > > explicitly cleared (clear() method) or in the future will have the
>>>> option
>>>> > > to expire.
>>>> > >
>>>> > > The heavy work horse for streaming state is currently RocksDB, which
>>>> > > internally uses native (off-heap) memory to keep the data.
>>>> > >
>>>> > > Does that help?
>>>> > >
>>>> > > Stephan
>>>> > >
>>>> > >
>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>> > > wrote:
>>>> > >
>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>> avoided by
>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>> methods.
>>>> > > >
>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>>>> So...
>>>> > > >
>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>> > > >
>>>> > > > - If so, how does Flink detect that an object is no longer being
>>>> used
>>>> > and
>>>> > > > can be reclaimed for reuse once again ?
>>>> > > >
>>>> > > > -roshan
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8832.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Streaming - memory management

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vinay,

can you give a bit more detail about how you plan to implement the outer
join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?

An alternative could be to use a CoGroup operator which collects from two
inputs all elements that share a common key (the join key) and are in the
same window. The interface of the function provides two iterators over the
elements of both inputs and can be used to implement outer join
functionality. The benefit of working with a CoGroupFunction is that you do
not have to take care of state handling at all.

In case you go for a custom implementation you will need to work with
operator state.
However, you do not need to directly interact with RocksDB. Flink is taking
care of that for you.

Best, Fabian

2016-09-01 16:13 GMT+02:00 vinay patil <vi...@gmail.com>:

> Hi Fabian/Stephan,
>
> Waiting for your suggestion
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>
>> Hi Fabian/Stephan,
>>
>> This makes things clear.
>>
>> This is the use case I have :
>> I am performing a outer join operation on the two streams (in window)
>> after which I get matchingAndNonMatchingStream, now I want to make sure
>> that the matching rate is high (matching cannot happen if one of the source
>> is not emitting elements for certain time) , so to tackle this situation I
>> was thinking of using RocksDB as a state Backend, where I will insert the
>> unmatched records in it (key - will be same as used for window and value
>> will be DTO ), so before inserting into it I will check if it is already
>> present in RocksDB, if yes I will take the data from it and send it
>> downstream (and ensure I perform the clean operation for that key).
>> (Also the data to store should be encrypted, encryption part can be
>> handled )
>>
>> so instead of using Cassandra , Can I do this using RocksDB as state
>> backend since the state is not gone after checkpointing ?
>>
>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>> handling late elements but to tackle edge case scenarios like the one
>> mentioned above we are having a backup plan of using Cassandra as external
>> store since we are dealing with financial critical data.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>
>>> Hi Vinaj,
>>>
>>> if you use user-defined state, you have to manually clear it.
>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>>> job goes down (planned or due to an OOM error).
>>>
>>> This is esp. important to keep in mind, when using keyed state.
>>> If you have an unbounded, evolving key space you will likely run
>>> out-of-memory.
>>> The job will constantly add state for each new key but won't be able to
>>> clean up the state for "expired" keys.
>>>
>>> You could implement a clean-up mechanism this if you implement a custom
>>> stream operator.
>>> However this is a very low level interface and requires solid
>>> understanding
>>> of the internals like timestamps, watermarks and the checkpointing
>>> mechanism.
>>>
>>> The community is currently working on a state expiry feature (state will
>>> be
>>> discarded if not requested or updated for x minutes).
>>>
>>> Regarding the second question: Does state remain local after
>>> checkpointing?
>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>>> remains in the operator. So the state is not gone after a checkpoint is
>>> completed.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>
>>> > Hi Stephan,
>>> >
>>> > Just wanted to jump into this discussion regarding state.
>>> >
>>> > So do you mean that if we maintain user-defined state (for non-window
>>> > operators), then if we do  not clear it explicitly will the data for
>>> that
>>> > key remains in RocksDB.
>>> >
>>> > What happens in case of checkpoint ? I read in the documentation that
>>> after
>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>> location
>>> > (hdfs or s3 or other fs), so for user-defined state does the data still
>>> > remain in RocksDB after checkpoint ?
>>> >
>>> > Correct me if I have misunderstood this concept
>>> >
>>> > For one of our use we were going for this, but since I read the above
>>> part
>>> > in documentation so we are going for Cassandra now (to store records
>>> and
>>> > query them for a special case)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > Regards,
>>> > Vinay Patil
>>> >
>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>> >
>>> > > In streaming, memory is mainly needed for state (key/value state).
>>> The
>>> > > exact representation depends on the chosen StateBackend.
>>> > >
>>> > > State is explicitly released: For windows, state is cleaned up
>>> > > automatically (firing / expiry), for user-defined state, keys have
>>> to be
>>> > > explicitly cleared (clear() method) or in the future will have the
>>> option
>>> > > to expire.
>>> > >
>>> > > The heavy work horse for streaming state is currently RocksDB, which
>>> > > internally uses native (off-heap) memory to keep the data.
>>> > >
>>> > > Does that help?
>>> > >
>>> > > Stephan
>>> > >
>>> > >
>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>> > > wrote:
>>> > >
>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>> avoided by
>>> > > > storing messages being processed in ByteBuffers via Unsafe methods.
>>> > > >
>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>>> So...
>>> > > >
>>> > > > - Am wondering if this is also the case with Streaming ?
>>> > > >
>>> > > > - If so, how does Flink detect that an object is no longer being
>>> used
>>> > and
>>> > > > can be reclaimed for reuse once again ?
>>> > > >
>>> > > > -roshan
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>