You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chakravarthy varaga <ch...@gmail.com> on 2016/09/05 15:51:43 UTC

Sharing Java Collections within Flink Cluster

Hi Team,

    I'm working on a Flink Streaming application. The data is injected
through Kafka connectors. The payload volume is roughly 100K/sec. The event
payload is a string. Let's call this as DataStream1.
This application also uses another DataStream, call it DataStream2,
(consumes events off a kafka topic). The elements of this DataStream2
involves in a certain transformation that finally updates a Hashmap(/Java
util Collection). Apparently the flink application should share this
HashMap across the flink cluster so that DataStream1 application could
check the state of the values in this collection. Is there a way to do this
in Flink?

    I don't see any Shared Collection used within the cluster?

Best Regards
CVP

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Team,

      Will you be able to guide me on this? Is this a known issue with
checkpointing ?

CVP

On 22 Sep 2016 15:57, "Chakravarthy varaga" <ch...@gmail.com>
wrote:

> PFA, Flink_checkpoint_time.png in relation to this issue.
>
> On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Aljoscha & Fabian,
>>
>>     Finally I got this working. Thanks for your help. In terms persisting
>> the state (for S2), I tried to use checkpoint every 10 Secs using a
>> FsStateBackend... What I notice is that the checkpoint duration is  almost
>> 2 minutes for many cases, while for the other cases it varies from 100 ms
>> to 1.5 minutes frequently.
>>
>>     The pseudocode is as below:
>>
>>      KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>      KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T
>> into k-v pairs).keyBy(0);
>>
>>      ks1.connect(ks2).flatMap(X);
>>      //X is a CoFlatMapFunction that inserts and removes elements from
>> ks2 into a key-value state member. Elements from ks1 are matched against
>> that state.
>>
>>      //ks1 is streaming about 100K events/sec from kafka topic
>>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
>> the 1st event is consumed from this stream, checkpoint takes 2 minutes
>> straightaway.
>>
>>     The version of flink is 1.1.2
>>
>>  Best Regards
>> CVP
>>
>> On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you don't need the BlockedEventState class, you should be able to just
>>> do this:
>>>
>>> private transient ValueState<BlockedRoadInfo> blockedRoads;
>>>          ............
>>>       @Override
>>>     public void open(final org.apache.flink.configuration.Configuration
>>> parameters) throws Exception {
>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>>>
>>>     };
>>>
>>>   }
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>>     I'm coding to check if your proposal works and hit with an issue
>>>> with ClassCastException.
>>>>
>>>>
>>>>     // Here is my Value that has state information.....an
>>>> implementation of my value state... where the key is a Double value... on
>>>> connected stream ks2
>>>>
>>>>     public class BlockedEventState implements
>>>> ValueState<BlockedRoadInfo> {
>>>>
>>>>     public BlockedRoadInfo blockedRoad;
>>>>
>>>>     @Override
>>>>     public void clear() {
>>>>         blockedRoad = null;
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public BlockedRoadInfo value() throws IOException {
>>>>         return blockedRoad;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void update(final BlockedRoadInfo value) throws IOException {
>>>>         blockedRoad = value;
>>>>     }
>>>> }
>>>>
>>>>        //BlockedRoadInfo class...
>>>>         public class BlockedRoadInfo {
>>>>             long maxLink;
>>>>             long minLink;
>>>>             double blockedEventId;
>>>>     ....setters & ... getters
>>>> }
>>>>
>>>> /// new RichCoFlatMapFunction() {
>>>>
>>>> private transient BlockedEventState blockedRoads;
>>>>          ............
>>>>       @Override
>>>>     public void open(final org.apache.flink.configuration.Configuration
>>>> parameters) throws Exception {
>>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>>         blockedRoads = (BlockedEventState)
>>>> getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
>>>> CLASSCAST*
>>>>
>>>>     };
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>> *Caused by: java.lang.ClassCastException:
>>>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>>>> com.ericsson.components.aia.io
>>>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState*
>>>>
>>>>
>>>>
>>>> *I have tried to set the state backend to both MemState and
>>>> FsState...streamEnv.setStateBackend(new
>>>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>>>
>>>>
>>>>
>>>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Not sure if I got your requirements right, but would this work?
>>>>>
>>>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>>>> k-v pairs).keyBy(0);
>>>>>
>>>>> ks1.connect(ks2).flatMap(X)
>>>>>
>>>>> X is a CoFlatMapFunction that inserts and removes elements from ks2
>>>>> into a key-value state member. Elements from ks1 are matched against that
>>>>> state.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>>      First of all thanks for all your prompt responses. With regards
>>>>>> to 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>>>
>>>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>>>> each of the streaming elements into string mapped value...
>>>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>>>> ....keyN:<value4>
>>>>>>
>>>>>>      Now,
>>>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>>>>> available across the cluster...
>>>>>>
>>>>>> Thanks a million !
>>>>>> CVP
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That depends.
>>>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>>>> inserted. In order to remove entries from the k-v-state you have to set the
>>>>>>> value to null. Note that you need an explicit delete-value record to
>>>>>>> trigger the eviction.
>>>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>>>> independent from each other. You can partition DS1 only on a single key and
>>>>>>> the other keys might be located on different shards. A workaround might be
>>>>>>> to duplicate S1 events for each key that they need to look up. However, you
>>>>>>> might need to collect events from the same S1 event after the join. If that
>>>>>>> does not work for you, the only thing that comes to my mind is to broadcast
>>>>>>> the state and keep a full local copy in each operator.
>>>>>>>
>>>>>>> Let me add one more thing regarding the upcoming rescaling feature.
>>>>>>> If this is interesting for you, rescaling will also work (maybe not in the
>>>>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>>>>> parallel operator instances.
>>>>>>>
>>>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>
>>>>>>>> I'm understanding this better with your explanation..
>>>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>>>> state, you can have something which behaves kind of similar to a distribute
>>>>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>>>>> hashtable.
>>>>>>>>>
>>>>>>>>> If you have to do only a single key lookup for each element of
>>>>>>>>> DS1, you should think about partitioning both streams (keyBy) and writing
>>>>>>>>> the state into Flink's key-value state [1].
>>>>>>>>>
>>>>>>>>> This will have several benefits:
>>>>>>>>> 1) State does not need to be replicated
>>>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>>>> 3) Flink takes care of the look-up. No need to have your own
>>>>>>>>> hashmap.
>>>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>>>> (this feature is currently under development).
>>>>>>>>>
>>>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>>> apis/streaming/state.html
>>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>>> apis/streaming/state_backends.html
>>>>>>>>>
>>>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>
>>>>>>>>>> certainly, what I thought as well...
>>>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>>>> updates...
>>>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>>>> and updating the state (by reading from the topic) in this collection, will
>>>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>>>> other job an option?
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>
>>>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>>>>>>> running within the same cluster.
>>>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates
>>>>>>>>>>>> a 'cache' on which (Job1) needs to work on.
>>>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>>>     Is there a way?
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If that does not work for you because the state is too large
>>>>>>>>>>>>> and if it is possible to partition the state (and both streams), you can
>>>>>>>>>>>>> also use keyBy instead of broadcast.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store
>>>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>> Varaga
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data
>>>>>>>>>>>>>>> is injected through Kafka connectors. The payload volume is roughly
>>>>>>>>>>>>>>> 100K/sec. The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I don't see any Shared Collection used within the
>>>>>>>>>>>>>>> cluster?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
PFA, Flink_checkpoint_time.png in relation to this issue.

On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Aljoscha & Fabian,
>
>     Finally I got this working. Thanks for your help. In terms persisting
> the state (for S2), I tried to use checkpoint every 10 Secs using a
> FsStateBackend... What I notice is that the checkpoint duration is  almost
> 2 minutes for many cases, while for the other cases it varies from 100 ms
> to 1.5 minutes frequently.
>
>     The pseudocode is as below:
>
>      KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>      KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
> k-v pairs).keyBy(0);
>
>      ks1.connect(ks2).flatMap(X);
>      //X is a CoFlatMapFunction that inserts and removes elements from ks2
> into a key-value state member. Elements from ks1 are matched against that
> state.
>
>      //ks1 is streaming about 100K events/sec from kafka topic
>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
> the 1st event is consumed from this stream, checkpoint takes 2 minutes
> straightaway.
>
>     The version of flink is 1.1.2
>
>  Best Regards
> CVP
>
> On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> you don't need the BlockedEventState class, you should be able to just do
>> this:
>>
>> private transient ValueState<BlockedRoadInfo> blockedRoads;
>>          ............
>>       @Override
>>     public void open(final org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>>
>>     };
>>
>>   }
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>>     I'm coding to check if your proposal works and hit with an issue
>>> with ClassCastException.
>>>
>>>
>>>     // Here is my Value that has state information.....an implementation
>>> of my value state... where the key is a Double value... on connected stream
>>> ks2
>>>
>>>     public class BlockedEventState implements
>>> ValueState<BlockedRoadInfo> {
>>>
>>>     public BlockedRoadInfo blockedRoad;
>>>
>>>     @Override
>>>     public void clear() {
>>>         blockedRoad = null;
>>>
>>>     }
>>>
>>>     @Override
>>>     public BlockedRoadInfo value() throws IOException {
>>>         return blockedRoad;
>>>     }
>>>
>>>     @Override
>>>     public void update(final BlockedRoadInfo value) throws IOException {
>>>         blockedRoad = value;
>>>     }
>>> }
>>>
>>>        //BlockedRoadInfo class...
>>>         public class BlockedRoadInfo {
>>>             long maxLink;
>>>             long minLink;
>>>             double blockedEventId;
>>>     ....setters & ... getters
>>> }
>>>
>>> /// new RichCoFlatMapFunction() {
>>>
>>> private transient BlockedEventState blockedRoads;
>>>          ............
>>>       @Override
>>>     public void open(final org.apache.flink.configuration.Configuration
>>> parameters) throws Exception {
>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>         blockedRoads = (BlockedEventState) getRuntimeContext().getState(b
>>> lockedStateDesc); * // FAILS HERE WITH CLASSCAST*
>>>
>>>     };
>>>
>>>   }
>>>
>>>
>>>
>>>
>>> *Caused by: java.lang.ClassCastException:
>>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>>> com.ericsson.components.aia.io
>>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState*
>>>
>>>
>>>
>>> *I have tried to set the state backend to both MemState and
>>> FsState...streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>>
>>>
>>>
>>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Not sure if I got your requirements right, but would this work?
>>>>
>>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>>> k-v pairs).keyBy(0);
>>>>
>>>> ks1.connect(ks2).flatMap(X)
>>>>
>>>> X is a CoFlatMapFunction that inserts and removes elements from ks2
>>>> into a key-value state member. Elements from ks1 are matched against that
>>>> state.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>>      First of all thanks for all your prompt responses. With regards
>>>>> to 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>>
>>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>>> each of the streaming elements into string mapped value...
>>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>>> ....keyN:<value4>
>>>>>
>>>>>      Now,
>>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>>>> available across the cluster...
>>>>>
>>>>> Thanks a million !
>>>>> CVP
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That depends.
>>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>>> inserted. In order to remove entries from the k-v-state you have to set the
>>>>>> value to null. Note that you need an explicit delete-value record to
>>>>>> trigger the eviction.
>>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>>> independent from each other. You can partition DS1 only on a single key and
>>>>>> the other keys might be located on different shards. A workaround might be
>>>>>> to duplicate S1 events for each key that they need to look up. However, you
>>>>>> might need to collect events from the same S1 event after the join. If that
>>>>>> does not work for you, the only thing that comes to my mind is to broadcast
>>>>>> the state and keep a full local copy in each operator.
>>>>>>
>>>>>> Let me add one more thing regarding the upcoming rescaling feature.
>>>>>> If this is interesting for you, rescaling will also work (maybe not in the
>>>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>>>> parallel operator instances.
>>>>>>
>>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> I'm understanding this better with your explanation..
>>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>>> state, you can have something which behaves kind of similar to a distribute
>>>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>>>> hashtable.
>>>>>>>>
>>>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>>>>> state into Flink's key-value state [1].
>>>>>>>>
>>>>>>>> This will have several benefits:
>>>>>>>> 1) State does not need to be replicated
>>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>>> 3) Flink takes care of the look-up. No need to have your own
>>>>>>>> hashmap.
>>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>>> (this feature is currently under development).
>>>>>>>>
>>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>> apis/streaming/state.html
>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>> apis/streaming/state_backends.html
>>>>>>>>
>>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>
>>>>>>>>> certainly, what I thought as well...
>>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>>> updates...
>>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>>> and updating the state (by reading from the topic) in this collection, will
>>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>>> other job an option?
>>>>>>>>>>
>>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>
>>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>>>>>> running within the same cluster.
>>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates
>>>>>>>>>>> a 'cache' on which (Job1) needs to work on.
>>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>>     Is there a way?
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>>
>>>>>>>>>>>> If that does not work for you because the state is too large
>>>>>>>>>>>> and if it is possible to partition the state (and both streams), you can
>>>>>>>>>>>> also use keyBy instead of broadcast.
>>>>>>>>>>>>
>>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store
>>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>> Varaga
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Aljoscha & Fabian,

    Finally I got this working. Thanks for your help. In terms persisting
the state (for S2), I tried to use checkpoint every 10 Secs using a
FsStateBackend... What I notice is that the checkpoint duration is  almost
2 minutes for many cases, while for the other cases it varies from 100 ms
to 1.5 minutes frequently.

    The pseudocode is as below:

     KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
     KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
k-v pairs).keyBy(0);

     ks1.connect(ks2).flatMap(X);
     //X is a CoFlatMapFunction that inserts and removes elements from ks2
into a key-value state member. Elements from ks1 are matched against that
state.

     //ks1 is streaming about 100K events/sec from kafka topic
     //ks2 is streaming about 1 event every 10 minutes... Precisely when
the 1st event is consumed from this stream, checkpoint takes 2 minutes
straightaway.

    The version of flink is 1.1.2

 Best Regards
CVP

On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> you don't need the BlockedEventState class, you should be able to just do
> this:
>
> private transient ValueState<BlockedRoadInfo> blockedRoads;
>          ............
>       @Override
>     public void open(final org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>                 TypeInformation.of(BlockedRoadInfo.class), null);
>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>
>     };
>
>   }
>
> Cheers,
> Aljoscha
>
>
> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <ch...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>>     I'm coding to check if your proposal works and hit with an issue with
>> ClassCastException.
>>
>>
>>     // Here is my Value that has state information.....an implementation
>> of my value state... where the key is a Double value... on connected stream
>> ks2
>>
>>     public class BlockedEventState implements ValueState<BlockedRoadInfo>
>> {
>>
>>     public BlockedRoadInfo blockedRoad;
>>
>>     @Override
>>     public void clear() {
>>         blockedRoad = null;
>>
>>     }
>>
>>     @Override
>>     public BlockedRoadInfo value() throws IOException {
>>         return blockedRoad;
>>     }
>>
>>     @Override
>>     public void update(final BlockedRoadInfo value) throws IOException {
>>         blockedRoad = value;
>>     }
>> }
>>
>>        //BlockedRoadInfo class...
>>         public class BlockedRoadInfo {
>>             long maxLink;
>>             long minLink;
>>             double blockedEventId;
>>     ....setters & ... getters
>> }
>>
>> /// new RichCoFlatMapFunction() {
>>
>> private transient BlockedEventState blockedRoads;
>>          ............
>>       @Override
>>     public void open(final org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>         blockedRoads = (BlockedEventState) getRuntimeContext().getState(
>> blockedStateDesc); * // FAILS HERE WITH CLASSCAST*
>>
>>     };
>>
>>   }
>>
>>
>>
>>
>> *Caused by: java.lang.ClassCastException:
>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>> com.ericsson.components.aia.iot.volvo.state.BlockedEventState*
>>
>>
>>
>> *I have tried to set the state backend to both MemState and
>> FsState...streamEnv.setStateBackend(new
>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Not sure if I got your requirements right, but would this work?
>>>
>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>> k-v pairs).keyBy(0);
>>>
>>> ks1.connect(ks2).flatMap(X)
>>>
>>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into
>>> a key-value state member. Elements from ks1 are matched against that state.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> Hi Fabian,
>>>>
>>>>      First of all thanks for all your prompt responses. With regards to
>>>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>
>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>> each of the streaming elements into string mapped value...
>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>> ....keyN:<value4>
>>>>
>>>>      Now,
>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>>> available across the cluster...
>>>>
>>>> Thanks a million !
>>>> CVP
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> That depends.
>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>> inserted. In order to remove entries from the k-v-state you have to set the
>>>>> value to null. Note that you need an explicit delete-value record to
>>>>> trigger the eviction.
>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>> independent from each other. You can partition DS1 only on a single key and
>>>>> the other keys might be located on different shards. A workaround might be
>>>>> to duplicate S1 events for each key that they need to look up. However, you
>>>>> might need to collect events from the same S1 event after the join. If that
>>>>> does not work for you, the only thing that comes to my mind is to broadcast
>>>>> the state and keep a full local copy in each operator.
>>>>>
>>>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>>>> this is interesting for you, rescaling will also work (maybe not in the
>>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>>> parallel operator instances.
>>>>>
>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> I'm understanding this better with your explanation..
>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>> state, you can have something which behaves kind of similar to a distribute
>>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>>> hashtable.
>>>>>>>
>>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>>>> state into Flink's key-value state [1].
>>>>>>>
>>>>>>> This will have several benefits:
>>>>>>> 1) State does not need to be replicated
>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>> (this feature is currently under development).
>>>>>>>
>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>>>> release-1.1/apis/streaming/state.html
>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>>>>>> release-1.1/apis/streaming/state_backends.html
>>>>>>>
>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>
>>>>>>>> certainly, what I thought as well...
>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>> updates...
>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>> and updating the state (by reading from the topic) in this collection, will
>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>> other job an option?
>>>>>>>>>
>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Fabian,
>>>>>>>>>>
>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>>>>> running within the same cluster.
>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>     Is there a way?
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>
>>>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>>>> if it is possible to partition the state (and both streams), you can also
>>>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>>>
>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>
>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> Varaga
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: Sharing Java Collections within Flink Cluster

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you don't need the BlockedEventState class, you should be able to just do
this:

private transient ValueState<BlockedRoadInfo> blockedRoads;
         ............
      @Override
    public void open(final org.apache.flink.configuration.Configuration
parameters) throws Exception {
        final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
                TypeInformation.of(BlockedRoadInfo.class), null);
        blockedRoads = getRuntimeContext().getState(blockedStateDesc);

    };

  }

Cheers,
Aljoscha


On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <ch...@gmail.com>
wrote:

> Hi Fabian,
>
>     I'm coding to check if your proposal works and hit with an issue with
> ClassCastException.
>
>
>     // Here is my Value that has state information.....an implementation
> of my value state... where the key is a Double value... on connected stream
> ks2
>
>     public class BlockedEventState implements ValueState<BlockedRoadInfo> {
>
>     public BlockedRoadInfo blockedRoad;
>
>     @Override
>     public void clear() {
>         blockedRoad = null;
>
>     }
>
>     @Override
>     public BlockedRoadInfo value() throws IOException {
>         return blockedRoad;
>     }
>
>     @Override
>     public void update(final BlockedRoadInfo value) throws IOException {
>         blockedRoad = value;
>     }
> }
>
>        //BlockedRoadInfo class...
>         public class BlockedRoadInfo {
>             long maxLink;
>             long minLink;
>             double blockedEventId;
>     ....setters & ... getters
> }
>
> /// new RichCoFlatMapFunction() {
>
> private transient BlockedEventState blockedRoads;
>          ............
>       @Override
>     public void open(final org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
> ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>                 TypeInformation.of(BlockedRoadInfo.class), null);
>         blockedRoads = (BlockedEventState)
> getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
> CLASSCAST*
>
>     };
>
>   }
>
>
>
>
> *Caused by: java.lang.ClassCastException:
> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
> com.ericsson.components.aia.iot.volvo.state.BlockedEventState*
>
>
>
> *I have tried to set the state backend to both MemState and
> FsState...streamEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/checkpoints"));*
>
>
>
> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Not sure if I got your requirements right, but would this work?
>>
>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
>> pairs).keyBy(0);
>>
>> ks1.connect(ks2).flatMap(X)
>>
>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into
>> a key-value state member. Elements from ks1 are matched against that state.
>>
>> Best, Fabian
>>
>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> Hi Fabian,
>>>
>>>      First of all thanks for all your prompt responses. With regards to
>>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>>
>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>> each of the streaming elements into string mapped value...
>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
>>> as you proposed.. xxx() should be my function() which splits the string and
>>> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>>>
>>>      Now,
>>>         I wish to map elementKeyStream with look ups within (key1,
>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>> available across the cluster...
>>>
>>> Thanks a million !
>>> CVP
>>>
>>>
>>>
>>>
>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> That depends.
>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>> inserted. In order to remove entries from the k-v-state you have to set the
>>>> value to null. Note that you need an explicit delete-value record to
>>>> trigger the eviction.
>>>> 2) Multiple lookups: This does only work if all lookups are independent
>>>> from each other. You can partition DS1 only on a single key and the other
>>>> keys might be located on different shards. A workaround might be to
>>>> duplicate S1 events for each key that they need to look up. However, you
>>>> might need to collect events from the same S1 event after the join. If that
>>>> does not work for you, the only thing that comes to my mind is to broadcast
>>>> the state and keep a full local copy in each operator.
>>>>
>>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>>> this is interesting for you, rescaling will also work (maybe not in the
>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>> parallel operator instances.
>>>>
>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> I'm understanding this better with your explanation..
>>>>> With this use case,    each element in DS1 has to look up against a
>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>> of keys.... will the key-value shard work in this case?
>>>>>
>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>> state, you can have something which behaves kind of similar to a distribute
>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>> hashtable.
>>>>>>
>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>>> state into Flink's key-value state [1].
>>>>>>
>>>>>> This will have several benefits:
>>>>>> 1) State does not need to be replicated
>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>> (this feature is currently under development).
>>>>>>
>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state_backends.html
>>>>>>
>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> certainly, what I thought as well...
>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>> updates...
>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>> However, assuming that we maintain this state into a collection, and
>>>>>>> updating the state (by reading from the topic) in this collection, will
>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>> other job an option?
>>>>>>>>
>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Fabian,
>>>>>>>>>
>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>>>> running within the same cluster.
>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>>>     Is there a way?
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>
>>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>>> if it is possible to partition the state (and both streams), you can also
>>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>>
>>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Team,
>>>>>>>>>>>
>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> Varaga
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>
>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>
>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Fabian,

    I'm coding to check if your proposal works and hit with an issue with
ClassCastException.


    // Here is my Value that has state information.....an implementation of
my value state... where the key is a Double value... on connected stream ks2

    public class BlockedEventState implements ValueState<BlockedRoadInfo> {

    public BlockedRoadInfo blockedRoad;

    @Override
    public void clear() {
        blockedRoad = null;

    }

    @Override
    public BlockedRoadInfo value() throws IOException {
        return blockedRoad;
    }

    @Override
    public void update(final BlockedRoadInfo value) throws IOException {
        blockedRoad = value;
    }
}

       //BlockedRoadInfo class...
        public class BlockedRoadInfo {
            long maxLink;
            long minLink;
            double blockedEventId;
    ....setters & ... getters
}

/// new RichCoFlatMapFunction() {

private transient BlockedEventState blockedRoads;
         ............
      @Override
    public void open(final org.apache.flink.configuration.Configuration
parameters) throws Exception {
        final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
                TypeInformation.of(BlockedRoadInfo.class), null);
        blockedRoads = (BlockedEventState)
getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
CLASSCAST*

    };

  }




*Caused by: java.lang.ClassCastException:
org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
com.ericsson.components.aia.iot.volvo.state.BlockedEventState*



*I have tried to set the state backend to both MemState and
FsState...streamEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/checkpoints"));*



On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Not sure if I got your requirements right, but would this work?
>
> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
> pairs).keyBy(0);
>
> ks1.connect(ks2).flatMap(X)
>
> X is a CoFlatMapFunction that inserts and removes elements from ks2 into a
> key-value state member. Elements from ks1 are matched against that state.
>
> Best, Fabian
>
> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> Hi Fabian,
>>
>>      First of all thanks for all your prompt responses. With regards to
>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>
>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>> each of the streaming elements into string mapped value...
>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
>> as you proposed.. xxx() should be my function() which splits the string and
>> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>>
>>      Now,
>>         I wish to map elementKeyStream with look ups within (key1,
>> key2...keyN) where key1, key2.. keyN and their respective values should be
>> available across the cluster...
>>
>> Thanks a million !
>> CVP
>>
>>
>>
>>
>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> That depends.
>>> 1) Growing/Shrinking: This should work. New entries can always be
>>> inserted. In order to remove entries from the k-v-state you have to set the
>>> value to null. Note that you need an explicit delete-value record to
>>> trigger the eviction.
>>> 2) Multiple lookups: This does only work if all lookups are independent
>>> from each other. You can partition DS1 only on a single key and the other
>>> keys might be located on different shards. A workaround might be to
>>> duplicate S1 events for each key that they need to look up. However, you
>>> might need to collect events from the same S1 event after the join. If that
>>> does not work for you, the only thing that comes to my mind is to broadcast
>>> the state and keep a full local copy in each operator.
>>>
>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>> this is interesting for you, rescaling will also work (maybe not in the
>>> first version) for broadcasted state, i.e. state which is the same on all
>>> parallel operator instances.
>>>
>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> I'm understanding this better with your explanation..
>>>> With this use case,    each element in DS1 has to look up against a
>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>> of keys.... will the key-value shard work in this case?
>>>>
>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Operator state is always local in Flink. However, with key-value
>>>>> state, you can have something which behaves kind of similar to a distribute
>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>> hashtable.
>>>>>
>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>> state into Flink's key-value state [1].
>>>>>
>>>>> This will have several benefits:
>>>>> 1) State does not need to be replicated
>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>> 4) It will only be possible to rescale jobs with key-value state (this
>>>>> feature is currently under development).
>>>>>
>>>>> If using the key-value state is possible, I'd go for that.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> apis/streaming/state.html
>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> apis/streaming/state_backends.html
>>>>>
>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> certainly, what I thought as well...
>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>> updates...
>>>>>> reading this topic from the other job, job1, is okie.
>>>>>> However, assuming that we maintain this state into a collection, and
>>>>>> updating the state (by reading from the topic) in this collection, will
>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>> other job an option?
>>>>>>>
>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>
>>>>>>>> Hi Fabian,
>>>>>>>>
>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>>> running within the same cluster.
>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>>     Is there a way?
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Flink does not provide shared state.
>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>
>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>> if it is possible to partition the state (and both streams), you can also
>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>
>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> Varaga
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Team,
>>>>>>>>>>>
>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>
>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Not sure if I got your requirements right, but would this work?

KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
pairs).keyBy(0);

ks1.connect(ks2).flatMap(X)

X is a CoFlatMapFunction that inserts and removes elements from ks2 into a
key-value state member. Elements from ks1 are matched against that state.

Best, Fabian

2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> Hi Fabian,
>
>      First of all thanks for all your prompt responses. With regards to 2)
> Multiple looks ups, I have to clarify what I mean by that...
>
>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps each
> of the streaming elements into string mapped value...
>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
> as you proposed.. xxx() should be my function() which splits the string and
> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>
>      Now,
>         I wish to map elementKeyStream with look ups within (key1,
> key2...keyN) where key1, key2.. keyN and their respective values should be
> available across the cluster...
>
> Thanks a million !
> CVP
>
>
>
>
> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> That depends.
>> 1) Growing/Shrinking: This should work. New entries can always be
>> inserted. In order to remove entries from the k-v-state you have to set the
>> value to null. Note that you need an explicit delete-value record to
>> trigger the eviction.
>> 2) Multiple lookups: This does only work if all lookups are independent
>> from each other. You can partition DS1 only on a single key and the other
>> keys might be located on different shards. A workaround might be to
>> duplicate S1 events for each key that they need to look up. However, you
>> might need to collect events from the same S1 event after the join. If that
>> does not work for you, the only thing that comes to my mind is to broadcast
>> the state and keep a full local copy in each operator.
>>
>> Let me add one more thing regarding the upcoming rescaling feature. If
>> this is interesting for you, rescaling will also work (maybe not in the
>> first version) for broadcasted state, i.e. state which is the same on all
>> parallel operator instances.
>>
>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> I'm understanding this better with your explanation..
>>> With this use case,    each element in DS1 has to look up against a
>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>> of keys.... will the key-value shard work in this case?
>>>
>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Operator state is always local in Flink. However, with key-value state,
>>>> you can have something which behaves kind of similar to a distribute
>>>> hashmap, because each operator holds a different shard/partition of the
>>>> hashtable.
>>>>
>>>> If you have to do only a single key lookup for each element of DS1, you
>>>> should think about partitioning both streams (keyBy) and writing the state
>>>> into Flink's key-value state [1].
>>>>
>>>> This will have several benefits:
>>>> 1) State does not need to be replicated
>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>> reside on disk. You are not bound to the memory of the JVM.
>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>> 4) It will only be possible to rescale jobs with key-value state (this
>>>> feature is currently under development).
>>>>
>>>> If using the key-value state is possible, I'd go for that.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state.html
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state_backends.html
>>>>
>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> certainly, what I thought as well...
>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>> updates...
>>>>> reading this topic from the other job, job1, is okie.
>>>>> However, assuming that we maintain this state into a collection, and
>>>>> updating the state (by reading from the topic) in this collection, will
>>>>> this be replicated across the cluster within this job1 ?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>>>> job an option?
>>>>>>
>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>>> running within the same cluster.
>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>     Is there a way?
>>>>>>>
>>>>>>> Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Flink does not provide shared state.
>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>>>> each operator has its own local copy of the state.
>>>>>>>>
>>>>>>>> If that does not work for you because the state is too large and if
>>>>>>>> it is possible to partition the state (and both streams), you can also use
>>>>>>>> keyBy instead of broadcast.
>>>>>>>>
>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Team,
>>>>>>>>>
>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> Varaga
>>>>>>>>>
>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>>> do this in Flink?
>>>>>>>>>>
>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Fabian,

     First of all thanks for all your prompt responses. With regards to 2)
Multiple looks ups, I have to clarify what I mean by that...

     DS1<String> elementKeyStream  = stream1.map(String<>); this maps each
of the streaming elements into string mapped value...
     DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream, as
you proposed.. xxx() should be my function() which splits the string and
generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>

     Now,
        I wish to map elementKeyStream with look ups within (key1,
key2...keyN) where key1, key2.. keyN and their respective values should be
available across the cluster...

Thanks a million !
CVP




On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fh...@gmail.com> wrote:

> That depends.
> 1) Growing/Shrinking: This should work. New entries can always be
> inserted. In order to remove entries from the k-v-state you have to set the
> value to null. Note that you need an explicit delete-value record to
> trigger the eviction.
> 2) Multiple lookups: This does only work if all lookups are independent
> from each other. You can partition DS1 only on a single key and the other
> keys might be located on different shards. A workaround might be to
> duplicate S1 events for each key that they need to look up. However, you
> might need to collect events from the same S1 event after the join. If that
> does not work for you, the only thing that comes to my mind is to broadcast
> the state and keep a full local copy in each operator.
>
> Let me add one more thing regarding the upcoming rescaling feature. If
> this is interesting for you, rescaling will also work (maybe not in the
> first version) for broadcasted state, i.e. state which is the same on all
> parallel operator instances.
>
> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> I'm understanding this better with your explanation..
>> With this use case,    each element in DS1 has to look up against a
>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>> of keys.... will the key-value shard work in this case?
>>
>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Operator state is always local in Flink. However, with key-value state,
>>> you can have something which behaves kind of similar to a distribute
>>> hashmap, because each operator holds a different shard/partition of the
>>> hashtable.
>>>
>>> If you have to do only a single key lookup for each element of DS1, you
>>> should think about partitioning both streams (keyBy) and writing the state
>>> into Flink's key-value state [1].
>>>
>>> This will have several benefits:
>>> 1) State does not need to be replicated
>>> 2) Depending on the backend (RocksDB) [2], parts of the state can reside
>>> on disk. You are not bound to the memory of the JVM.
>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>> 4) It will only be possible to rescale jobs with key-value state (this
>>> feature is currently under development).
>>>
>>> If using the key-value state is possible, I'd go for that.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> apis/streaming/state.html
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> apis/streaming/state_backends.html
>>>
>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> certainly, what I thought as well...
>>>> The output of DataStream2 could be in 1000s and there are state
>>>> updates...
>>>> reading this topic from the other job, job1, is okie.
>>>> However, assuming that we maintain this state into a collection, and
>>>> updating the state (by reading from the topic) in this collection, will
>>>> this be replicated across the cluster within this job1 ?
>>>>
>>>>
>>>>
>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>>> job an option?
>>>>>
>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>>> running within the same cluster.
>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>     Our intention is to not use the external key/value store as we
>>>>>> are trying to localize the cache within the cluster.
>>>>>>     Is there a way?
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Flink does not provide shared state.
>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>>> each operator has its own local copy of the state.
>>>>>>>
>>>>>>> If that does not work for you because the state is too large and if
>>>>>>> it is possible to partition the state (and both streams), you can also use
>>>>>>> keyBy instead of broadcast.
>>>>>>>
>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> Varaga
>>>>>>>>
>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Team,
>>>>>>>>>
>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of this
>>>>>>>>> DataStream2 involves in a certain transformation that finally updates a
>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application should
>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 application
>>>>>>>>> could check the state of the values in this collection. Is there a way to
>>>>>>>>> do this in Flink?
>>>>>>>>>
>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
That depends.
1) Growing/Shrinking: This should work. New entries can always be inserted.
In order to remove entries from the k-v-state you have to set the value to
null. Note that you need an explicit delete-value record to trigger the
eviction.
2) Multiple lookups: This does only work if all lookups are independent
from each other. You can partition DS1 only on a single key and the other
keys might be located on different shards. A workaround might be to
duplicate S1 events for each key that they need to look up. However, you
might need to collect events from the same S1 event after the join. If that
does not work for you, the only thing that comes to my mind is to broadcast
the state and keep a full local copy in each operator.

Let me add one more thing regarding the upcoming rescaling feature. If this
is interesting for you, rescaling will also work (maybe not in the first
version) for broadcasted state, i.e. state which is the same on all
parallel operator instances.

2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> I'm understanding this better with your explanation..
> With this use case,    each element in DS1 has to look up against a 'bunch
> of keys' from DS2 and DS2 could shrink/expand in terms of the no., of
> keys.... will the key-value shard work in this case?
>
> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Operator state is always local in Flink. However, with key-value state,
>> you can have something which behaves kind of similar to a distribute
>> hashmap, because each operator holds a different shard/partition of the
>> hashtable.
>>
>> If you have to do only a single key lookup for each element of DS1, you
>> should think about partitioning both streams (keyBy) and writing the state
>> into Flink's key-value state [1].
>>
>> This will have several benefits:
>> 1) State does not need to be replicated
>> 2) Depending on the backend (RocksDB) [2], parts of the state can reside
>> on disk. You are not bound to the memory of the JVM.
>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>> 4) It will only be possible to rescale jobs with key-value state (this
>> feature is currently under development).
>>
>> If using the key-value state is possible, I'd go for that.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state.html
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state_backends.html
>>
>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> certainly, what I thought as well...
>>> The output of DataStream2 could be in 1000s and there are state
>>> updates...
>>> reading this topic from the other job, job1, is okie.
>>> However, assuming that we maintain this state into a collection, and
>>> updating the state (by reading from the topic) in this collection, will
>>> this be replicated across the cluster within this job1 ?
>>>
>>>
>>>
>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>> job an option?
>>>>
>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>>     Thanks for your response. Apparently these DataStream
>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>> running within the same cluster.
>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>> 'cache' on which (Job1) needs to work on.
>>>>>     Our intention is to not use the external key/value store as we are
>>>>> trying to localize the cache within the cluster.
>>>>>     Is there a way?
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink does not provide shared state.
>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>> each operator has its own local copy of the state.
>>>>>>
>>>>>> If that does not work for you because the state is too large and if
>>>>>> it is possible to partition the state (and both streams), you can also use
>>>>>> keyBy instead of broadcast.
>>>>>>
>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>> This application also uses another DataStream, call it DataStream2,
>>>>>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>>>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>>>>>> util Collection). Apparently the flink application should share this
>>>>>>>> HashMap across the flink cluster so that DataStream1 application could
>>>>>>>> check the state of the values in this collection. Is there a way to do this
>>>>>>>> in Flink?
>>>>>>>>
>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
I'm understanding this better with your explanation..
With this use case,    each element in DS1 has to look up against a 'bunch
of keys' from DS2 and DS2 could shrink/expand in terms of the no., of
keys.... will the key-value shard work in this case?

On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Operator state is always local in Flink. However, with key-value state,
> you can have something which behaves kind of similar to a distribute
> hashmap, because each operator holds a different shard/partition of the
> hashtable.
>
> If you have to do only a single key lookup for each element of DS1, you
> should think about partitioning both streams (keyBy) and writing the state
> into Flink's key-value state [1].
>
> This will have several benefits:
> 1) State does not need to be replicated
> 2) Depending on the backend (RocksDB) [2], parts of the state can reside
> on disk. You are not bound to the memory of the JVM.
> 3) Flink takes care of the look-up. No need to have your own hashmap.
> 4) It will only be possible to rescale jobs with key-value state (this
> feature is currently under development).
>
> If using the key-value state is possible, I'd go for that.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state_backends.html
>
> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> certainly, what I thought as well...
>> The output of DataStream2 could be in 1000s and there are state updates...
>> reading this topic from the other job, job1, is okie.
>> However, assuming that we maintain this state into a collection, and
>> updating the state (by reading from the topic) in this collection, will
>> this be replicated across the cluster within this job1 ?
>>
>>
>>
>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>> job an option?
>>>
>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> Hi Fabian,
>>>>
>>>>     Thanks for your response. Apparently these DataStream
>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>> running within the same cluster.
>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>> 'cache' on which (Job1) needs to work on.
>>>>     Our intention is to not use the external key/value store as we are
>>>> trying to localize the cache within the cluster.
>>>>     Is there a way?
>>>>
>>>> Best Regards
>>>> CVP
>>>>
>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink does not provide shared state.
>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>> each operator has its own local copy of the state.
>>>>>
>>>>> If that does not work for you because the state is too large and if it
>>>>> is possible to partition the state (and both streams), you can also use
>>>>> keyBy instead of broadcast.
>>>>>
>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com>:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>
>>>>>> Best Regards
>>>>>> Varaga
>>>>>>
>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>> This application also uses another DataStream, call it DataStream2,
>>>>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>>>>> util Collection). Apparently the flink application should share this
>>>>>>> HashMap across the flink cluster so that DataStream1 application could
>>>>>>> check the state of the values in this collection. Is there a way to do this
>>>>>>> in Flink?
>>>>>>>
>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>
>>>>>>> Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Pushpendra,

1. Queryable state is an upcoming feature and not part of an official
release yet. With queryable state you can query operator state from outside
the application.

2. Have you had a look at the CoFlatMap operator? This operator "connects"
two streams and allows to have state which is accessible from both streams.

Best, Fabian

2016-09-08 8:54 GMT+02:00 pushpendra.jaiswal <pushpendra.jaiswal90@gmail.com
>:

> Hi Fabian
> I am also looking for this solution, could you help me with two things:
>
> 1. How this is different from Queryable state.
>
> 2. How to query this key-value state from DS2 even if its running in  the
> same application.
>
> e.g.
>
> val keyedStream = stream.keyby(_.key)
> val otherStream = somekafka.createStream
>
> The final goal is to have something like:
>
> otherStream.foreach(kafkamessage => keyedStream.lookup(kafkamessage.key))
>
> ~Pushpendra Jaiswal
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Sharing-Java-
> Collections-within-Flink-Cluster-tp8919p8965.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Sharing Java Collections within Flink Cluster

Posted by "pushpendra.jaiswal" <pu...@gmail.com>.
Hi Fabian
I am also looking for this solution, could you help me with two things:

1. How this is different from Queryable state.

2. How to query this key-value state from DS2 even if its running in  the
same application.

e.g. 

val keyedStream = stream.keyby(_.key)
val otherStream = somekafka.createStream

The final goal is to have something like:

otherStream.foreach(kafkamessage => keyedStream.lookup(kafkamessage.key))

~Pushpendra Jaiswal





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-Java-Collections-within-Flink-Cluster-tp8919p8965.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Operator state is always local in Flink. However, with key-value state, you
can have something which behaves kind of similar to a distribute hashmap,
because each operator holds a different shard/partition of the hashtable.

If you have to do only a single key lookup for each element of DS1, you
should think about partitioning both streams (keyBy) and writing the state
into Flink's key-value state [1].

This will have several benefits:
1) State does not need to be replicated
2) Depending on the backend (RocksDB) [2], parts of the state can reside on
disk. You are not bound to the memory of the JVM.
3) Flink takes care of the look-up. No need to have your own hashmap.
4) It will only be possible to rescale jobs with key-value state (this
feature is currently under development).

If using the key-value state is possible, I'd go for that.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state_backends.html

2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> certainly, what I thought as well...
> The output of DataStream2 could be in 1000s and there are state updates...
> reading this topic from the other job, job1, is okie.
> However, assuming that we maintain this state into a collection, and
> updating the state (by reading from the topic) in this collection, will
> this be replicated across the cluster within this job1 ?
>
>
>
> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Is writing DataStream2 to a Kafka topic and reading it from the other job
>> an option?
>>
>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> Hi Fabian,
>>>
>>>     Thanks for your response. Apparently these DataStream
>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>> running within the same cluster.
>>>     DataStream2 (from Job2) applies transformations and updates a
>>> 'cache' on which (Job1) needs to work on.
>>>     Our intention is to not use the external key/value store as we are
>>> trying to localize the cache within the cluster.
>>>     Is there a way?
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink does not provide shared state.
>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>> each operator has its own local copy of the state.
>>>>
>>>> If that does not work for you because the state is too large and if it
>>>> is possible to partition the state (and both streams), you can also use
>>>> keyBy instead of broadcast.
>>>>
>>>> Finally, you can use an external system like a KeyValue Store or
>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> Hi Team,
>>>>>
>>>>>      Can someone help me here? Appreciate any response !
>>>>>
>>>>> Best Regards
>>>>> Varaga
>>>>>
>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>> injected through Kafka connectors. The payload volume is roughly 100K/sec.
>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>> This application also uses another DataStream, call it DataStream2,
>>>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>>>> util Collection). Apparently the flink application should share this
>>>>>> HashMap across the flink cluster so that DataStream1 application could
>>>>>> check the state of the values in this collection. Is there a way to do this
>>>>>> in Flink?
>>>>>>
>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
certainly, what I thought as well...
The output of DataStream2 could be in 1000s and there are state updates...
reading this topic from the other job, job1, is okie.
However, assuming that we maintain this state into a collection, and
updating the state (by reading from the topic) in this collection, will
this be replicated across the cluster within this job1 ?



On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Is writing DataStream2 to a Kafka topic and reading it from the other job
> an option?
>
> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> Hi Fabian,
>>
>>     Thanks for your response. Apparently these DataStream
>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>> running within the same cluster.
>>     DataStream2 (from Job2) applies transformations and updates a 'cache'
>> on which (Job1) needs to work on.
>>     Our intention is to not use the external key/value store as we are
>> trying to localize the cache within the cluster.
>>     Is there a way?
>>
>> Best Regards
>> CVP
>>
>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink does not provide shared state.
>>> However, you can broadcast a stream to CoFlatMapFunction, such that each
>>> operator has its own local copy of the state.
>>>
>>> If that does not work for you because the state is too large and if it
>>> is possible to partition the state (and both streams), you can also use
>>> keyBy instead of broadcast.
>>>
>>> Finally, you can use an external system like a KeyValue Store or
>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com
>>> >:
>>>
>>>> Hi Team,
>>>>
>>>>      Can someone help me here? Appreciate any response !
>>>>
>>>> Best Regards
>>>> Varaga
>>>>
>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>>     I'm working on a Flink Streaming application. The data is injected
>>>>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>>>>> payload is a string. Let's call this as DataStream1.
>>>>> This application also uses another DataStream, call it DataStream2,
>>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>>> util Collection). Apparently the flink application should share this
>>>>> HashMap across the flink cluster so that DataStream1 application could
>>>>> check the state of the values in this collection. Is there a way to do this
>>>>> in Flink?
>>>>>
>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Is writing DataStream2 to a Kafka topic and reading it from the other job
an option?

2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> Hi Fabian,
>
>     Thanks for your response. Apparently these DataStream
> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
> running within the same cluster.
>     DataStream2 (from Job2) applies transformations and updates a 'cache'
> on which (Job1) needs to work on.
>     Our intention is to not use the external key/value store as we are
> trying to localize the cache within the cluster.
>     Is there a way?
>
> Best Regards
> CVP
>
> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink does not provide shared state.
>> However, you can broadcast a stream to CoFlatMapFunction, such that each
>> operator has its own local copy of the state.
>>
>> If that does not work for you because the state is too large and if it is
>> possible to partition the state (and both streams), you can also use keyBy
>> instead of broadcast.
>>
>> Finally, you can use an external system like a KeyValue Store or
>> In-Memory store like Apache Ignite to hold your distributed collection.
>>
>> Best, Fabian
>>
>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>
>> :
>>
>>> Hi Team,
>>>
>>>      Can someone help me here? Appreciate any response !
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>>     I'm working on a Flink Streaming application. The data is injected
>>>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>>>> payload is a string. Let's call this as DataStream1.
>>>> This application also uses another DataStream, call it DataStream2,
>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>> util Collection). Apparently the flink application should share this
>>>> HashMap across the flink cluster so that DataStream1 application could
>>>> check the state of the values in this collection. Is there a way to do this
>>>> in Flink?
>>>>
>>>>     I don't see any Shared Collection used within the cluster?
>>>>
>>>> Best Regards
>>>> CVP
>>>>
>>>
>>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Fabian,

    Thanks for your response. Apparently these DataStream (Job1-DataStream1
& Job2-DataStream2) are from different flink applications running within
the same cluster.
    DataStream2 (from Job2) applies transformations and updates a 'cache'
on which (Job1) needs to work on.
    Our intention is to not use the external key/value store as we are
trying to localize the cache within the cluster.
    Is there a way?

Best Regards
CVP

On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Flink does not provide shared state.
> However, you can broadcast a stream to CoFlatMapFunction, such that each
> operator has its own local copy of the state.
>
> If that does not work for you because the state is too large and if it is
> possible to partition the state (and both streams), you can also use keyBy
> instead of broadcast.
>
> Finally, you can use an external system like a KeyValue Store or In-Memory
> store like Apache Ignite to hold your distributed collection.
>
> Best, Fabian
>
> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:
>
>> Hi Team,
>>
>>      Can someone help me here? Appreciate any response !
>>
>> Best Regards
>> Varaga
>>
>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>>     I'm working on a Flink Streaming application. The data is injected
>>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>>> payload is a string. Let's call this as DataStream1.
>>> This application also uses another DataStream, call it DataStream2,
>>> (consumes events off a kafka topic). The elements of this DataStream2
>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>> util Collection). Apparently the flink application should share this
>>> HashMap across the flink cluster so that DataStream1 application could
>>> check the state of the values in this collection. Is there a way to do this
>>> in Flink?
>>>
>>>     I don't see any Shared Collection used within the cluster?
>>>
>>> Best Regards
>>> CVP
>>>
>>
>>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink does not provide shared state.
However, you can broadcast a stream to CoFlatMapFunction, such that each
operator has its own local copy of the state.

If that does not work for you because the state is too large and if it is
possible to partition the state (and both streams), you can also use keyBy
instead of broadcast.

Finally, you can use an external system like a KeyValue Store or In-Memory
store like Apache Ignite to hold your distributed collection.

Best, Fabian

2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <ch...@gmail.com>:

> Hi Team,
>
>      Can someone help me here? Appreciate any response !
>
> Best Regards
> Varaga
>
> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Team,
>>
>>     I'm working on a Flink Streaming application. The data is injected
>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>> payload is a string. Let's call this as DataStream1.
>> This application also uses another DataStream, call it DataStream2,
>> (consumes events off a kafka topic). The elements of this DataStream2
>> involves in a certain transformation that finally updates a Hashmap(/Java
>> util Collection). Apparently the flink application should share this
>> HashMap across the flink cluster so that DataStream1 application could
>> check the state of the values in this collection. Is there a way to do this
>> in Flink?
>>
>>     I don't see any Shared Collection used within the cluster?
>>
>> Best Regards
>> CVP
>>
>
>

Re: Sharing Java Collections within Flink Cluster

Posted by Chakravarthy varaga <ch...@gmail.com>.
Hi Team,

     Can someone help me here? Appreciate any response !

Best Regards
Varaga

On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Team,
>
>     I'm working on a Flink Streaming application. The data is injected
> through Kafka connectors. The payload volume is roughly 100K/sec. The event
> payload is a string. Let's call this as DataStream1.
> This application also uses another DataStream, call it DataStream2,
> (consumes events off a kafka topic). The elements of this DataStream2
> involves in a certain transformation that finally updates a Hashmap(/Java
> util Collection). Apparently the flink application should share this
> HashMap across the flink cluster so that DataStream1 application could
> check the state of the values in this collection. Is there a way to do this
> in Flink?
>
>     I don't see any Shared Collection used within the cluster?
>
> Best Regards
> CVP
>