You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodore Vasiloudis <th...@gmail.com> on 2016/11/21 11:06:44 UTC

Understanding connected streams use without timestamps

Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I
had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream,
model, that produces
Double[] elements by ingesting and processing a stream of training Integer
data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());

The model stream is then connected onto a newData stream which allows us to
use the
constantly updated model stream to make predictions for the incoming stream
of newData,
by having a model variable shared between the two map functions in the
coMap class.
The shared model var is updated every time an element from the model stream
arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor
());

My confusion comes when I tried a slightly different approach [2], without
using timestamps
or watermarks. In my example I simply create countWindows of say 100
elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());

When I then connect the model stream to the newData stream, the map1
function of the
comap never sees the model as not null, as it seems that the map functions
are executed
in order: first the map1 function is executed for all the newData elements,
then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the
connected stream?

How would I handle such a case if the notion of timestamps does not apply
for my data?
(i.e. here I'm interested in streaming historical data, I assume their
order does not matter)


[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java

[2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0

Re: Understanding connected streams use without timestamps

Posted by Aljoscha Krettek <al...@apache.org>.
Nope, not right now but this is pretty much what we're trying to solve with
side inputs:
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit

On Mon, 21 Nov 2016 at 16:11 Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Thanks for the clarification Gyula!
>
> In that case, is it possible currently to make one of the two connected
> streams stall until the other stream has produced at least one output
> before it starts producing as well?
>
>
> On Mon, Nov 21, 2016 at 3:16 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
> Hi :)
>
> The execution of the Connected functions (map1/map2 in this case) are not
> affected by the timestamps. In other words it is pretty much arbitrary
> which input arrives at the CoMapFunction first.
>
> So I think you did everything correctly.
>
> Gyula
>
> Theodore Vasiloudis <th...@gmail.com> ezt írta (időpont:
> 2016. nov. 21., H, 12:07):
>
> Hello all,
>
> I was playing around with the the IncrementalLearningSkeleton example and
> I had a couple of questions regarding the behavior of connected streams.
>
> In the example the elements are assigned timestamps, and there is a
> stream, model, that produces
> Double[] elements by ingesting and processing a stream of training Integer
> data points.
>
> DataStream<Double[]> model = trainingData
>                 .assignTimestampsAndWatermarks(new LinearTimestamp())
>                 .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
>                 .apply(new PartialModelBuilder());
>
> The model stream is then connected onto a newData stream which allows us
> to use the
> constantly updated model stream to make predictions for the incoming
> stream of newData,
> by having a model variable shared between the two map functions in the
> coMap class.
> The shared model var is updated every time an element from the model
> stream arrives (starts
> out as null)
>
> DataStream<Integer> prediction = newData.connect(model).map(new Predictor
> ());
>
> My confusion comes when I tried a slightly different approach [2], without
> using timestamps
> or watermarks. In my example I simply create countWindows of say 100
> elements,
> and I use readTextFile to read in the trainingData and newData :
>
> DataStream<ArrayList<Double>> model = trainingData
>         .countWindowAll(100)
>         .apply(new PartialModelBuilder());
>
> When I then connect the model stream to the newData stream, the map1
> function of the
> comap never sees the model as not null, as it seems that the map functions
> are executed
> in order: first the map1 function is executed for all the newData
> elements, then the map2
> function is executed for all the model elements.
>
> So how does having or not having timestamps affect the behavior of the
> connected stream?
>
> How would I handle such a case if the notion of timestamps does not apply
> for my data?
> (i.e. here I'm interested in streaming historical data, I assume their
> order does not matter)
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
>
> [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0
>
>
>

Re: Understanding connected streams use without timestamps

Posted by Theodore Vasiloudis <th...@gmail.com>.
Thanks for the clarification Gyula!

In that case, is it possible currently to make one of the two connected
streams stall until the other stream has produced at least one output
before it starts producing as well?


On Mon, Nov 21, 2016 at 3:16 PM, Gyula Fóra <gy...@gmail.com> wrote:

> Hi :)
>
> The execution of the Connected functions (map1/map2 in this case) are not
> affected by the timestamps. In other words it is pretty much arbitrary
> which input arrives at the CoMapFunction first.
>
> So I think you did everything correctly.
>
> Gyula
>
> Theodore Vasiloudis <th...@gmail.com> ezt írta (időpont:
> 2016. nov. 21., H, 12:07):
>
>> Hello all,
>>
>> I was playing around with the the IncrementalLearningSkeleton example and
>> I had a couple of questions regarding the behavior of connected streams.
>>
>> In the example the elements are assigned timestamps, and there is a
>> stream, model, that produces
>> Double[] elements by ingesting and processing a stream of training
>> Integer data points.
>>
>> DataStream<Double[]> model = trainingData
>>                 .assignTimestampsAndWatermarks(new LinearTimestamp())
>>                 .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
>>                 .apply(new PartialModelBuilder());
>>
>> The model stream is then connected onto a newData stream which allows us
>> to use the
>> constantly updated model stream to make predictions for the incoming
>> stream of newData,
>> by having a model variable shared between the two map functions in the
>> coMap class.
>> The shared model var is updated every time an element from the model
>> stream arrives (starts
>> out as null)
>>
>> DataStream<Integer> prediction = newData.connect(model).map(new Predictor
>> ());
>>
>> My confusion comes when I tried a slightly different approach [2],
>> without using timestamps
>> or watermarks. In my example I simply create countWindows of say 100
>> elements,
>> and I use readTextFile to read in the trainingData and newData :
>>
>> DataStream<ArrayList<Double>> model = trainingData
>>         .countWindowAll(100)
>>         .apply(new PartialModelBuilder());
>>
>> When I then connect the model stream to the newData stream, the map1
>> function of the
>> comap never sees the model as not null, as it seems that the map
>> functions are executed
>> in order: first the map1 function is executed for all the newData
>> elements, then the map2
>> function is executed for all the model elements.
>>
>> So how does having or not having timestamps affect the behavior of the
>> connected stream?
>>
>> How would I handle such a case if the notion of timestamps does not apply
>> for my data?
>> (i.e. here I'm interested in streaming historical data, I assume their
>> order does not matter)
>>
>>
>> [1] https://github.com/apache/flink/blob/master/flink-
>> examples/flink-examples-streaming/src/main/java/org/
>> apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
>>
>> [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0
>>
>

Re: Understanding connected streams use without timestamps

Posted by Gyula Fóra <gy...@gmail.com>.
Hi :)

The execution of the Connected functions (map1/map2 in this case) are not
affected by the timestamps. In other words it is pretty much arbitrary
which input arrives at the CoMapFunction first.

So I think you did everything correctly.

Gyula

Theodore Vasiloudis <th...@gmail.com> ezt írta (időpont:
2016. nov. 21., H, 12:07):

> Hello all,
>
> I was playing around with the the IncrementalLearningSkeleton example and
> I had a couple of questions regarding the behavior of connected streams.
>
> In the example the elements are assigned timestamps, and there is a
> stream, model, that produces
> Double[] elements by ingesting and processing a stream of training Integer
> data points.
>
> DataStream<Double[]> model = trainingData
>                 .assignTimestampsAndWatermarks(new LinearTimestamp())
>                 .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
>                 .apply(new PartialModelBuilder());
>
> The model stream is then connected onto a newData stream which allows us
> to use the
> constantly updated model stream to make predictions for the incoming
> stream of newData,
> by having a model variable shared between the two map functions in the
> coMap class.
> The shared model var is updated every time an element from the model
> stream arrives (starts
> out as null)
>
> DataStream<Integer> prediction = newData.connect(model).map(new Predictor
> ());
>
> My confusion comes when I tried a slightly different approach [2], without
> using timestamps
> or watermarks. In my example I simply create countWindows of say 100
> elements,
> and I use readTextFile to read in the trainingData and newData :
>
> DataStream<ArrayList<Double>> model = trainingData
>         .countWindowAll(100)
>         .apply(new PartialModelBuilder());
>
> When I then connect the model stream to the newData stream, the map1
> function of the
> comap never sees the model as not null, as it seems that the map functions
> are executed
> in order: first the map1 function is executed for all the newData
> elements, then the map2
> function is executed for all the model elements.
>
> So how does having or not having timestamps affect the behavior of the
> connected stream?
>
> How would I handle such a case if the notion of timestamps does not apply
> for my data?
> (i.e. here I'm interested in streaming historical data, I assume their
> order does not matter)
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
>
> [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0
>