You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matt <dr...@gmail.com> on 2017/01/13 02:11:38 UTC

Objects accessible from all Flink nodes

Hello,

I have a stream of objects which I use to update the model of a
classification algorithm and another stream with the objects I need to
classify in real time.

The problem is that the instances for training and evaluation are processed
on potentially different Flink nodes, but the classifier should be applied
to all instances no matter in what node it was generated (ie, the
classifier should be accessible from any Flink node).

Just to make it clearer, here is what would NOT work since these sink
functions are not serializable:
https://gist.github.com/b979bf742b0d2f3da8cc8e5e91207151

Two questions here:

*1. How can an instance be accessed by any Flink node like this (line 11
and 19)? Maybe there's a better approach to this problem.*

*2. In the example the second stream (line 15) is started right away but at
startup the classifier is not ready to use until it has been trained with
enough instances. Is it possible to do this? If I'm not wrong env.execute
(line 24) can be used only once.*

Regards,
Matt

Re: Objects accessible from all Flink nodes

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Matt,

it is not possible to share an object across different task of the same
operator or even different operators.
This would be globally mutable state which is in general hard to get
efficient in distributed systems.

Something that might work is to use a CoFlatMapOperator with one input
being the training data and the other the actual input.
Then you can train the model and query the model in the same operator. You
would have multiple models, one in each parallel task.
If you can partition the training (and input) data in a meaningful way, you
would have a partition or key specific model. You can also use random
partitioning and have models which are based on random samples of the
training data. Or if you want each model to be based on the same input
data, you can broadcast the training data.

This would look as follows:

val input = ???
val training = ???

val predictions =
input.keyBy(xxx).connect(training.keyBy(xxx)).flatMap(YourFlatMap)) //
partitioned variant
val predictions = input.connect(training.shuffle).flatMap(YourFlatMap)) //
random variant
val predictions = input.connect(training.broadcast).flatMap(YourFlatMap))
// broadcasted variant

An example of a CoFlatMap which trains and queries a prediction model can
be found in the Flink Training [1] (code [2]).

Hope this helps,
Fabian

[1]
http://dataartisans.github.io/flink-training/exercises/timePrediction.html
[2]
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction.java


2017-01-13 15:38 GMT+01:00 Matt <dr...@gmail.com>:

> Errata: How can an *object (such as the classifier, line 1)* be accessed
> by any Flink node [...]
>
> Just in case, the classifier itself can't be serialized I believe, it's
> part of a framework which I can't modify. In any case, even if it's
> serialized, I guess the cost of moving it to one node and then another
> makes the whole data flow unpractical. It's better to move all created
> instances to one single node where only one instance of the classifier
> is maintained.
>
> I'm not sure if this is possible or how to do this.
>
> On Thu, Jan 12, 2017 at 11:11 PM, Matt <dr...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a stream of objects which I use to update the model of a
>> classification algorithm and another stream with the objects I need to
>> classify in real time.
>>
>> The problem is that the instances for training and evaluation are
>> processed on potentially different Flink nodes, but the classifier should
>> be applied to all instances no matter in what node it was generated (ie,
>> the classifier should be accessible from any Flink node).
>>
>> Just to make it clearer, here is what would NOT work since these sink
>> functions are not serializable: https://gist.github.com/b979bf
>> 742b0d2f3da8cc8e5e91207151
>>
>> Two questions here:
>>
>> *1. How can an instance be accessed by any Flink node like this (line 11
>> and 19)? Maybe there's a better approach to this problem.*
>>
>> *2. In the example the second stream (line 15) is started right away but
>> at startup the classifier is not ready to use until it has been trained
>> with enough instances. Is it possible to do this? If I'm not wrong
>> env.execute (line 24) can be used only once.*
>>
>> Regards,
>> Matt
>>
>
>

Re: Objects accessible from all Flink nodes

Posted by Matt <dr...@gmail.com>.
Errata: How can an *object (such as the classifier, line 1)* be accessed by
any Flink node [...]

Just in case, the classifier itself can't be serialized I believe, it's
part of a framework which I can't modify. In any case, even if it's
serialized, I guess the cost of moving it to one node and then another
makes the whole data flow unpractical. It's better to move all created
instances to one single node where only one instance of the classifier
is maintained.

I'm not sure if this is possible or how to do this.

On Thu, Jan 12, 2017 at 11:11 PM, Matt <dr...@gmail.com> wrote:

> Hello,
>
> I have a stream of objects which I use to update the model of a
> classification algorithm and another stream with the objects I need to
> classify in real time.
>
> The problem is that the instances for training and evaluation are
> processed on potentially different Flink nodes, but the classifier should
> be applied to all instances no matter in what node it was generated (ie,
> the classifier should be accessible from any Flink node).
>
> Just to make it clearer, here is what would NOT work since these sink
> functions are not serializable: https://gist.github.com/
> b979bf742b0d2f3da8cc8e5e91207151
>
> Two questions here:
>
> *1. How can an instance be accessed by any Flink node like this (line 11
> and 19)? Maybe there's a better approach to this problem.*
>
> *2. In the example the second stream (line 15) is started right away but
> at startup the classifier is not ready to use until it has been trained
> with enough instances. Is it possible to do this? If I'm not wrong
> env.execute (line 24) can be used only once.*
>
> Regards,
> Matt
>