You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Titus Rakkesh <ti...@gmail.com> on 2018/07/18 09:12:32 UTC

Window Stream - Need assistance

Dear Friends,
         I have 2 streams of the below data types.

DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;

DataStream<Tuple2<String, Double>> unionReloadsStream;

These streams are getting data from Kafka and getting data in different
frequencies. "unionReloadsStream"  will receive more data than
"splittedActivationTuple". I need to store  "splittedActivationTuple" in a
Window of 24 hours and manipulate its "Double" field, if a matching data
comes from unionReloadsStream (String field is the common field).

So I wrote the following method to do this task.


public static DataStream<Tuple3<String, Integer, Double>>
joinActivationsBasedOnReload(
            DataStream<Tuple3<String, Integer, Double>> activationsStream,
            DataStream<Tuple2<String, Double>> unifiedReloadStream) {

        return activationsStream.join(unifiedReloadStream).where(new
ActivationStreamSelector())
                .equalTo(new
ReloadStreamSelector()).window(GlobalWindows.create())
                .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
                .apply(new JoinFunction<Tuple3<String, Integer, Double>,
Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple3<String, Integer, Double>
join(Tuple3<String, Integer, Double> first,
                            Tuple2<String, Double> second) {
                        return new Tuple3<String, Integer,
Double>(first.f0, first.f1, first.f2 + second.f1);
                    }
                });
    }


and calling as,

DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);

activationWindowStream.print();


But I couldn't see anything printing.

I expected "activationWindowStream" to contain the
"splittedActivationTuple" (smaller set) data and the Double value
accumulated if  unionReloadsStream's incoming elements have a matching
"String" field. But that is not happening. Where I am missing?

Thanks,
Rakkesh

Re: Window Stream - Need assistance

Posted by vino yang <ya...@gmail.com>.
Hi Rakkesh,

As Xingcan said, the trigger is required by the window to FIRE, you can use
time window (contains a inner trigger) or (ProcessFunction + State + Timer).

Thanks, vino.

2018-07-18 21:44 GMT+08:00 Titus Rakkesh <ti...@gmail.com>:

> Thanks Xingcan. I specified as GlobalWindow since I am going to put all
> the elements coming with splittedActivationTuple with a 24 hour expiry and
> then do operations on that when elements coming with stream
> "unionReloadsStream" (bigger set).
>
> On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> The `GlobalWindow` is commonly used for custom window assignment and you
>> should specify a `trigger` for it [1].
>> If the built-in window (e.g., tumbling window or sliding window) join in
>> DataStream API fails to meet the requirements,
>> you could try the time-windowed join in Table/SQL API [2].
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/operators/windows.html#global-windows
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html#joins
>>
>>
>> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com>
>> wrote:
>>
>> Thanks for the reply. I have called "env.execute()". But nothing getting
>> printed. I have a doubt whether "implemented function" is correct with my
>> "requirement". Please assist.
>>
>> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Hi Rakkesh,
>>>
>>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
>>> wrote:
>>> >
>>> > Dear Friends,
>>> >          I have 2 streams of the below data types.
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>>> >
>>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>>> >
>>> > These streams are getting data from Kafka and getting data in
>>> different frequencies. "unionReloadsStream"  will receive more data than
>>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>>> Window of 24 hours and manipulate its "Double" field, if a matching data
>>> comes from unionReloadsStream (String field is the common field).
>>> >
>>> > So I wrote the following method to do this task.
>>> >
>>> >
>>> > public static DataStream<Tuple3<String, Integer, Double>>
>>> joinActivationsBasedOnReload(
>>> >             DataStream<Tuple3<String, Integer, Double>>
>>> activationsStream,
>>> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>>> >
>>> >         return activationsStream.join(unifiedReloadStream).where(new
>>> ActivationStreamSelector())
>>> >                 .equalTo(new ReloadStreamSelector()).window
>>> (GlobalWindows.create())
>>> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>>> >                 .apply(new JoinFunction<Tuple3<String, Integer,
>>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>>> >                     private static final long serialVersionUID = 1L;
>>> >                     @Override
>>> >                     public Tuple3<String, Integer, Double>
>>> join(Tuple3<String, Integer, Double> first,
>>> >                             Tuple2<String, Double> second) {
>>> >                         return new Tuple3<String, Integer,
>>> Double>(first.f0, first.f1, first.f2 + second.f1);
>>> >                     }
>>> >                 });
>>> >     }
>>> >
>>> >
>>> > and calling as,
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>>> joinActivationsBasedOnReload(splittedActivationTuple,
>>> unionReloadsStream);
>>> >
>>> > activationWindowStream.print();
>>> >
>>> >
>>> > But I couldn't see anything printing.
>>> >
>>> > I expected "activationWindowStream" to contain the
>>> "splittedActivationTuple" (smaller set) data and the Double value
>>> accumulated if  unionReloadsStream's incoming elements have a matching
>>> "String" field. But that is not happening. Where I am missing?
>>> >
>>> > Thanks,
>>> > Rakkesh
>>>
>>>
>>
>>
>

Re: Window Stream - Need assistance

Posted by Titus Rakkesh <ti...@gmail.com>.
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the
elements coming with splittedActivationTuple with a 24 hour expiry and then
do operations on that when elements coming with stream "unionReloadsStream"
(bigger set).

On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Rakkesh,
>
> The `GlobalWindow` is commonly used for custom window assignment and you
> should specify a `trigger` for it [1].
> If the built-in window (e.g., tumbling window or sliding window) join in
> DataStream API fails to meet the requirements,
> you could try the time-windowed join in Table/SQL API [2].
>
> Hope that helps.
>
> Best,
> Xingcan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
>
> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com>
> wrote:
>
> Thanks for the reply. I have called "env.execute()". But nothing getting
> printed. I have a doubt whether "implemented function" is correct with my
> "requirement". Please assist.
>
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>
>> Best,
>> Xingcan
>>
>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
>> wrote:
>> >
>> > Dear Friends,
>> >          I have 2 streams of the below data types.
>> >
>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>> >
>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>> >
>> > These streams are getting data from Kafka and getting data in different
>> frequencies. "unionReloadsStream"  will receive more data than
>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>> Window of 24 hours and manipulate its "Double" field, if a matching data
>> comes from unionReloadsStream (String field is the common field).
>> >
>> > So I wrote the following method to do this task.
>> >
>> >
>> > public static DataStream<Tuple3<String, Integer, Double>>
>> joinActivationsBasedOnReload(
>> >             DataStream<Tuple3<String, Integer, Double>>
>> activationsStream,
>> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>> >
>> >         return activationsStream.join(unifiedReloadStream).where(new
>> ActivationStreamSelector())
>> >                 .equalTo(new ReloadStreamSelector()).window
>> (GlobalWindows.create())
>> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>> >                 .apply(new JoinFunction<Tuple3<String, Integer,
>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>> >                     private static final long serialVersionUID = 1L;
>> >                     @Override
>> >                     public Tuple3<String, Integer, Double>
>> join(Tuple3<String, Integer, Double> first,
>> >                             Tuple2<String, Double> second) {
>> >                         return new Tuple3<String, Integer,
>> Double>(first.f0, first.f1, first.f2 + second.f1);
>> >                     }
>> >                 });
>> >     }
>> >
>> >
>> > and calling as,
>> >
>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>> joinActivationsBasedOnReload(splittedActivationTuple,
>> unionReloadsStream);
>> >
>> > activationWindowStream.print();
>> >
>> >
>> > But I couldn't see anything printing.
>> >
>> > I expected "activationWindowStream" to contain the
>> "splittedActivationTuple" (smaller set) data and the Double value
>> accumulated if  unionReloadsStream's incoming elements have a matching
>> "String" field. But that is not happening. Where I am missing?
>> >
>> > Thanks,
>> > Rakkesh
>>
>>
>
>

Re: Window Stream - Need assistance

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>


> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com> wrote:
> 
> Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist. 
> 
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Rakkesh,
> 
> Did you call `execute()`on your `StreamExecutionEnvironment`?
> 
> Best,
> Xingcan 
> 
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakkesh@gmail.com <ma...@gmail.com>> wrote:
> > 
> > Dear Friends,
> >          I have 2 streams of the below data types.
> > 
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> > 
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> > 
> > These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
> > 
> > So I wrote the following method to do this task.
> > 
> > 
> > public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
> >             DataStream<Tuple3<String, Integer, Double>> activationsStream,
> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >         
> >         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
> >                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> >                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> >                     private static final long serialVersionUID = 1L;
> >                     @Override
> >                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
> >                             Tuple2<String, Double> second) {
> >                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
> >                     }
> >                 });
> >     }
> > 
> > 
> > and calling as,
> > 
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >         
> > activationWindowStream.print();
> > 
> > 
> > But I couldn't see anything printing. 
> > 
> > I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
> > 
> > Thanks,
> > Rakkesh
> 
> 


Re: Window Stream - Need assistance

Posted by Titus Rakkesh <ti...@gmail.com>.
Thanks for the reply. I have called "env.execute()". But nothing getting
printed. I have a doubt whether "implemented function" is correct with my
"requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Rakkesh,
>
> Did you call `execute()`on your `StreamExecutionEnvironment`?
>
> Best,
> Xingcan
>
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
> wrote:
> >
> > Dear Friends,
> >          I have 2 streams of the below data types.
> >
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> >
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> >
> > These streams are getting data from Kafka and getting data in different
> frequencies. "unionReloadsStream"  will receive more data than
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
> Window of 24 hours and manipulate its "Double" field, if a matching data
> comes from unionReloadsStream (String field is the common field).
> >
> > So I wrote the following method to do this task.
> >
> >
> > public static DataStream<Tuple3<String, Integer, Double>>
> joinActivationsBasedOnReload(
> >             DataStream<Tuple3<String, Integer, Double>>
> activationsStream,
> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >
> >         return activationsStream.join(unifiedReloadStream).where(new
> ActivationStreamSelector())
> >                 .equalTo(new ReloadStreamSelector()).
> window(GlobalWindows.create())
> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> >                 .apply(new JoinFunction<Tuple3<String, Integer, Double>,
> Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> >                     private static final long serialVersionUID = 1L;
> >                     @Override
> >                     public Tuple3<String, Integer, Double>
> join(Tuple3<String, Integer, Double> first,
> >                             Tuple2<String, Double> second) {
> >                         return new Tuple3<String, Integer,
> Double>(first.f0, first.f1, first.f2 + second.f1);
> >                     }
> >                 });
> >     }
> >
> >
> > and calling as,
> >
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >
> > activationWindowStream.print();
> >
> >
> > But I couldn't see anything printing.
> >
> > I expected "activationWindowStream" to contain the
> "splittedActivationTuple" (smaller set) data and the Double value
> accumulated if  unionReloadsStream's incoming elements have a matching
> "String" field. But that is not happening. Where I am missing?
> >
> > Thanks,
> > Rakkesh
>
>

Re: Window Stream - Need assistance

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan 

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com> wrote:
> 
> Dear Friends,
>          I have 2 streams of the below data types.
> 
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> 
> DataStream<Tuple2<String, Double>> unionReloadsStream;
> 
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
> 
> So I wrote the following method to do this task.
> 
> 
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
> 
> 
> and calling as,
> 
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
> 
> 
> But I couldn't see anything printing. 
> 
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
> 
> Thanks,
> Rakkesh