You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Andra Lungu <lu...@gmail.com> on 2015/10/28 16:30:05 UTC

Caching information from a stream

Hey guys!

I've been thinking about this one today:

Say you have a stream of data in the form of (id, value) - This will
evidently be a DataStream of Tuple2.
I need to cache this data in some sort of static stream (perhaps even a
DataSet).
Then, if in the input stream, I see an id that was previously stored, I
should update its value with the most recent entry.

On an example:

1, 3
2, 5
6, 7
1, 5

The value cached for the id 1 should be 5.

How would you recommend caching the data? And what would be used for the
update? A join function?

As far as I see things, you cannot really combine DataSets with DataStreams
although a DataSet is, in essence, just a finite stream.
If this can indeed be done, some pseudocode would be nice :)

Thanks!
Andra

Re: Caching information from a stream

Posted by Andra Lungu <lu...@gmail.com>.
Thanks Max ^^

On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels <mx...@apache.org> wrote:

> Oups, forgot the mapper :)
>
> static class StatefulMapper extends RichMapFunction<Tuple2<Long,
> Long>, Tuple2<Long, Long>> {
>
>    private OperatorState<Long> counter;
>
>    @Override
>    public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws
> Exception {
>       System.out.println("Key: " + value.f0 +
>             " Previous state was: "+ counter.value() +
>             " Update state to: "+ value.f1);
>       counter.update(value.f1);
>       return value;
>    }
>
>    @Override
>    public void open(Configuration config) {
>       counter = getRuntimeContext().getKeyValueState("mystate",
> Long.class, -1L);
>    }
> }
>
>
>
> On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <mx...@apache.org>
> wrote:
>
> > Hi Andra,
> >
> > What you thought of turns out to be one of the core features of the Flink
> > streaming API. Flink's operators support state. State can be partitioned
> by
> > the the key using keyBy(field).
> >
> > You may use a MapFunction  to achieve what you wanted like so:
> >
> > public static void main(String[] args) throws Exception {
> >
> >    final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >    env.fromElements(new Tuple2<>(1L, 3L),
> >          new Tuple2<>(2L, 5L),
> >          new Tuple2<>(6L, 7L),
> >          new Tuple2<>(1L, 5L))
> >
> >        .keyBy(0)
> >
> >        .map(new StatefulMapper())
> >
> >        .print();
> >
> >    env.execute();
> >
> > }
> >
> > The output is the following on my machine (discarded the output of the
> > print):
> >
> > Key: 2 Previous state was: -1 Update state to: 5
> > Key: 1 Previous state was: -1 Update state to: 3
> > Key: 6 Previous state was: -1 Update state to: 7
> > Key: 1 Previous state was: 3 Update state to: 5
> >
> >
> > Cheers,
> > Max
> >
> >
> >
> > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lu...@gmail.com>
> > wrote:
> >
> >> Hey guys!
> >>
> >> I've been thinking about this one today:
> >>
> >> Say you have a stream of data in the form of (id, value) - This will
> >> evidently be a DataStream of Tuple2.
> >> I need to cache this data in some sort of static stream (perhaps even a
> >> DataSet).
> >> Then, if in the input stream, I see an id that was previously stored, I
> >> should update its value with the most recent entry.
> >>
> >> On an example:
> >>
> >> 1, 3
> >> 2, 5
> >> 6, 7
> >> 1, 5
> >>
> >> The value cached for the id 1 should be 5.
> >>
> >> How would you recommend caching the data? And what would be used for the
> >> update? A join function?
> >>
> >> As far as I see things, you cannot really combine DataSets with
> >> DataStreams
> >> although a DataSet is, in essence, just a finite stream.
> >> If this can indeed be done, some pseudocode would be nice :)
> >>
> >> Thanks!
> >> Andra
> >>
> >
> >
>

Re: Caching information from a stream

Posted by Maximilian Michels <mx...@apache.org>.
Oups, forgot the mapper :)

static class StatefulMapper extends RichMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>> {

   private OperatorState<Long> counter;

   @Override
   public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
      System.out.println("Key: " + value.f0 +
            " Previous state was: "+ counter.value() +
            " Update state to: "+ value.f1);
      counter.update(value.f1);
      return value;
   }

   @Override
   public void open(Configuration config) {
      counter = getRuntimeContext().getKeyValueState("mystate",
Long.class, -1L);
   }
}



On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Andra,
>
> What you thought of turns out to be one of the core features of the Flink
> streaming API. Flink's operators support state. State can be partitioned by
> the the key using keyBy(field).
>
> You may use a MapFunction  to achieve what you wanted like so:
>
> public static void main(String[] args) throws Exception {
>
>    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>    env.fromElements(new Tuple2<>(1L, 3L),
>          new Tuple2<>(2L, 5L),
>          new Tuple2<>(6L, 7L),
>          new Tuple2<>(1L, 5L))
>
>        .keyBy(0)
>
>        .map(new StatefulMapper())
>
>        .print();
>
>    env.execute();
>
> }
>
> The output is the following on my machine (discarded the output of the
> print):
>
> Key: 2 Previous state was: -1 Update state to: 5
> Key: 1 Previous state was: -1 Update state to: 3
> Key: 6 Previous state was: -1 Update state to: 7
> Key: 1 Previous state was: 3 Update state to: 5
>
>
> Cheers,
> Max
>
>
>
> On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lu...@gmail.com>
> wrote:
>
>> Hey guys!
>>
>> I've been thinking about this one today:
>>
>> Say you have a stream of data in the form of (id, value) - This will
>> evidently be a DataStream of Tuple2.
>> I need to cache this data in some sort of static stream (perhaps even a
>> DataSet).
>> Then, if in the input stream, I see an id that was previously stored, I
>> should update its value with the most recent entry.
>>
>> On an example:
>>
>> 1, 3
>> 2, 5
>> 6, 7
>> 1, 5
>>
>> The value cached for the id 1 should be 5.
>>
>> How would you recommend caching the data? And what would be used for the
>> update? A join function?
>>
>> As far as I see things, you cannot really combine DataSets with
>> DataStreams
>> although a DataSet is, in essence, just a finite stream.
>> If this can indeed be done, some pseudocode would be nice :)
>>
>> Thanks!
>> Andra
>>
>
>

Re: Caching information from a stream

Posted by Maximilian Michels <mx...@apache.org>.
Hi Andra,

What you thought of turns out to be one of the core features of the Flink
streaming API. Flink's operators support state. State can be partitioned by
the the key using keyBy(field).

You may use a MapFunction  to achieve what you wanted like so:

public static void main(String[] args) throws Exception {

   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   env.fromElements(new Tuple2<>(1L, 3L),
         new Tuple2<>(2L, 5L),
         new Tuple2<>(6L, 7L),
         new Tuple2<>(1L, 5L))

       .keyBy(0)

       .map(new StatefulMapper())

       .print();

   env.execute();

}

The output is the following on my machine (discarded the output of the
print):

Key: 2 Previous state was: -1 Update state to: 5
Key: 1 Previous state was: -1 Update state to: 3
Key: 6 Previous state was: -1 Update state to: 7
Key: 1 Previous state was: 3 Update state to: 5


Cheers,
Max



On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lu...@gmail.com> wrote:

> Hey guys!
>
> I've been thinking about this one today:
>
> Say you have a stream of data in the form of (id, value) - This will
> evidently be a DataStream of Tuple2.
> I need to cache this data in some sort of static stream (perhaps even a
> DataSet).
> Then, if in the input stream, I see an id that was previously stored, I
> should update its value with the most recent entry.
>
> On an example:
>
> 1, 3
> 2, 5
> 6, 7
> 1, 5
>
> The value cached for the id 1 should be 5.
>
> How would you recommend caching the data? And what would be used for the
> update? A join function?
>
> As far as I see things, you cannot really combine DataSets with DataStreams
> although a DataSet is, in essence, just a finite stream.
> If this can indeed be done, some pseudocode would be nice :)
>
> Thanks!
> Andra
>