You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Titus Rakkesh <ti...@gmail.com> on 2018/07/18 09:12:32 UTC
Window Stream - Need assistance
Dear Friends,
I have 2 streams of the below data types.
DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
DataStream<Tuple2<String, Double>> unionReloadsStream;
These streams are getting data from Kafka and getting data in different
frequencies. "unionReloadsStream" will receive more data than
"splittedActivationTuple". I need to store "splittedActivationTuple" in a
Window of 24 hours and manipulate its "Double" field, if a matching data
comes from unionReloadsStream (String field is the common field).
So I wrote the following method to do this task.
public static DataStream<Tuple3<String, Integer, Double>>
joinActivationsBasedOnReload(
DataStream<Tuple3<String, Integer, Double>> activationsStream,
DataStream<Tuple2<String, Double>> unifiedReloadStream) {
return activationsStream.join(unifiedReloadStream).where(new
ActivationStreamSelector())
.equalTo(new
ReloadStreamSelector()).window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
.apply(new JoinFunction<Tuple3<String, Integer, Double>,
Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple3<String, Integer, Double>
join(Tuple3<String, Integer, Double> first,
Tuple2<String, Double> second) {
return new Tuple3<String, Integer,
Double>(first.f0, first.f1, first.f2 + second.f1);
}
});
}
and calling as,
DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
activationWindowStream.print();
But I couldn't see anything printing.
I expected "activationWindowStream" to contain the
"splittedActivationTuple" (smaller set) data and the Double value
accumulated if unionReloadsStream's incoming elements have a matching
"String" field. But that is not happening. Where I am missing?
Thanks,
Rakkesh
Re: Window Stream - Need assistance
Posted by vino yang <ya...@gmail.com>.
Hi Rakkesh,
As Xingcan said, the trigger is required by the window to FIRE, you can use
time window (contains a inner trigger) or (ProcessFunction + State + Timer).
Thanks, vino.
2018-07-18 21:44 GMT+08:00 Titus Rakkesh <ti...@gmail.com>:
> Thanks Xingcan. I specified as GlobalWindow since I am going to put all
> the elements coming with splittedActivationTuple with a 24 hour expiry and
> then do operations on that when elements coming with stream
> "unionReloadsStream" (bigger set).
>
> On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> The `GlobalWindow` is commonly used for custom window assignment and you
>> should specify a `trigger` for it [1].
>> If the built-in window (e.g., tumbling window or sliding window) join in
>> DataStream API fails to meet the requirements,
>> you could try the time-windowed join in Table/SQL API [2].
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/operators/windows.html#global-windows
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html#joins
>>
>>
>> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com>
>> wrote:
>>
>> Thanks for the reply. I have called "env.execute()". But nothing getting
>> printed. I have a doubt whether "implemented function" is correct with my
>> "requirement". Please assist.
>>
>> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Hi Rakkesh,
>>>
>>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
>>> wrote:
>>> >
>>> > Dear Friends,
>>> > I have 2 streams of the below data types.
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>>> >
>>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>>> >
>>> > These streams are getting data from Kafka and getting data in
>>> different frequencies. "unionReloadsStream" will receive more data than
>>> "splittedActivationTuple". I need to store "splittedActivationTuple" in a
>>> Window of 24 hours and manipulate its "Double" field, if a matching data
>>> comes from unionReloadsStream (String field is the common field).
>>> >
>>> > So I wrote the following method to do this task.
>>> >
>>> >
>>> > public static DataStream<Tuple3<String, Integer, Double>>
>>> joinActivationsBasedOnReload(
>>> > DataStream<Tuple3<String, Integer, Double>>
>>> activationsStream,
>>> > DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>>> >
>>> > return activationsStream.join(unifiedReloadStream).where(new
>>> ActivationStreamSelector())
>>> > .equalTo(new ReloadStreamSelector()).window
>>> (GlobalWindows.create())
>>> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>>> > .apply(new JoinFunction<Tuple3<String, Integer,
>>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>>> > private static final long serialVersionUID = 1L;
>>> > @Override
>>> > public Tuple3<String, Integer, Double>
>>> join(Tuple3<String, Integer, Double> first,
>>> > Tuple2<String, Double> second) {
>>> > return new Tuple3<String, Integer,
>>> Double>(first.f0, first.f1, first.f2 + second.f1);
>>> > }
>>> > });
>>> > }
>>> >
>>> >
>>> > and calling as,
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>>> joinActivationsBasedOnReload(splittedActivationTuple,
>>> unionReloadsStream);
>>> >
>>> > activationWindowStream.print();
>>> >
>>> >
>>> > But I couldn't see anything printing.
>>> >
>>> > I expected "activationWindowStream" to contain the
>>> "splittedActivationTuple" (smaller set) data and the Double value
>>> accumulated if unionReloadsStream's incoming elements have a matching
>>> "String" field. But that is not happening. Where I am missing?
>>> >
>>> > Thanks,
>>> > Rakkesh
>>>
>>>
>>
>>
>
Re: Window Stream - Need assistance
Posted by Titus Rakkesh <ti...@gmail.com>.
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the
elements coming with splittedActivationTuple with a 24 hour expiry and then
do operations on that when elements coming with stream "unionReloadsStream"
(bigger set).
On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xi...@gmail.com> wrote:
> Hi Rakkesh,
>
> The `GlobalWindow` is commonly used for custom window assignment and you
> should specify a `trigger` for it [1].
> If the built-in window (e.g., tumbling window or sliding window) join in
> DataStream API fails to meet the requirements,
> you could try the time-windowed join in Table/SQL API [2].
>
> Hope that helps.
>
> Best,
> Xingcan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
>
> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com>
> wrote:
>
> Thanks for the reply. I have called "env.execute()". But nothing getting
> printed. I have a doubt whether "implemented function" is correct with my
> "requirement". Please assist.
>
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>
>> Best,
>> Xingcan
>>
>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
>> wrote:
>> >
>> > Dear Friends,
>> > I have 2 streams of the below data types.
>> >
>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>> >
>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>> >
>> > These streams are getting data from Kafka and getting data in different
>> frequencies. "unionReloadsStream" will receive more data than
>> "splittedActivationTuple". I need to store "splittedActivationTuple" in a
>> Window of 24 hours and manipulate its "Double" field, if a matching data
>> comes from unionReloadsStream (String field is the common field).
>> >
>> > So I wrote the following method to do this task.
>> >
>> >
>> > public static DataStream<Tuple3<String, Integer, Double>>
>> joinActivationsBasedOnReload(
>> > DataStream<Tuple3<String, Integer, Double>>
>> activationsStream,
>> > DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>> >
>> > return activationsStream.join(unifiedReloadStream).where(new
>> ActivationStreamSelector())
>> > .equalTo(new ReloadStreamSelector()).window
>> (GlobalWindows.create())
>> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>> > .apply(new JoinFunction<Tuple3<String, Integer,
>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>> > private static final long serialVersionUID = 1L;
>> > @Override
>> > public Tuple3<String, Integer, Double>
>> join(Tuple3<String, Integer, Double> first,
>> > Tuple2<String, Double> second) {
>> > return new Tuple3<String, Integer,
>> Double>(first.f0, first.f1, first.f2 + second.f1);
>> > }
>> > });
>> > }
>> >
>> >
>> > and calling as,
>> >
>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>> joinActivationsBasedOnReload(splittedActivationTuple,
>> unionReloadsStream);
>> >
>> > activationWindowStream.print();
>> >
>> >
>> > But I couldn't see anything printing.
>> >
>> > I expected "activationWindowStream" to contain the
>> "splittedActivationTuple" (smaller set) data and the Double value
>> accumulated if unionReloadsStream's incoming elements have a matching
>> "String" field. But that is not happening. Where I am missing?
>> >
>> > Thanks,
>> > Rakkesh
>>
>>
>
>
Re: Window Stream - Need assistance
Posted by Xingcan Cui <xi...@gmail.com>.
Hi Rakkesh,
The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].
Hope that helps.
Best,
Xingcan
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>
> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <ti...@gmail.com> wrote:
>
> Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.
>
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Rakkesh,
>
> Did you call `execute()`on your `StreamExecutionEnvironment`?
>
> Best,
> Xingcan
>
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakkesh@gmail.com <ma...@gmail.com>> wrote:
> >
> > Dear Friends,
> > I have 2 streams of the below data types.
> >
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> >
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> >
> > These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream" will receive more data than "splittedActivationTuple". I need to store "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
> >
> > So I wrote the following method to do this task.
> >
> >
> > public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
> > DataStream<Tuple3<String, Integer, Double>> activationsStream,
> > DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >
> > return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
> > .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> > .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
> > Tuple2<String, Double> second) {
> > return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
> > }
> > });
> > }
> >
> >
> > and calling as,
> >
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >
> > activationWindowStream.print();
> >
> >
> > But I couldn't see anything printing.
> >
> > I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
> >
> > Thanks,
> > Rakkesh
>
>
Re: Window Stream - Need assistance
Posted by Titus Rakkesh <ti...@gmail.com>.
Thanks for the reply. I have called "env.execute()". But nothing getting
printed. I have a doubt whether "implemented function" is correct with my
"requirement". Please assist.
On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xi...@gmail.com> wrote:
> Hi Rakkesh,
>
> Did you call `execute()`on your `StreamExecutionEnvironment`?
>
> Best,
> Xingcan
>
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com>
> wrote:
> >
> > Dear Friends,
> > I have 2 streams of the below data types.
> >
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> >
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> >
> > These streams are getting data from Kafka and getting data in different
> frequencies. "unionReloadsStream" will receive more data than
> "splittedActivationTuple". I need to store "splittedActivationTuple" in a
> Window of 24 hours and manipulate its "Double" field, if a matching data
> comes from unionReloadsStream (String field is the common field).
> >
> > So I wrote the following method to do this task.
> >
> >
> > public static DataStream<Tuple3<String, Integer, Double>>
> joinActivationsBasedOnReload(
> > DataStream<Tuple3<String, Integer, Double>>
> activationsStream,
> > DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >
> > return activationsStream.join(unifiedReloadStream).where(new
> ActivationStreamSelector())
> > .equalTo(new ReloadStreamSelector()).
> window(GlobalWindows.create())
> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> > .apply(new JoinFunction<Tuple3<String, Integer, Double>,
> Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Tuple3<String, Integer, Double>
> join(Tuple3<String, Integer, Double> first,
> > Tuple2<String, Double> second) {
> > return new Tuple3<String, Integer,
> Double>(first.f0, first.f1, first.f2 + second.f1);
> > }
> > });
> > }
> >
> >
> > and calling as,
> >
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >
> > activationWindowStream.print();
> >
> >
> > But I couldn't see anything printing.
> >
> > I expected "activationWindowStream" to contain the
> "splittedActivationTuple" (smaller set) data and the Double value
> accumulated if unionReloadsStream's incoming elements have a matching
> "String" field. But that is not happening. Where I am missing?
> >
> > Thanks,
> > Rakkesh
>
>
Re: Window Stream - Need assistance
Posted by Xingcan Cui <xi...@gmail.com>.
Hi Rakkesh,
Did you call `execute()`on your `StreamExecutionEnvironment`?
Best,
Xingcan
> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <ti...@gmail.com> wrote:
>
> Dear Friends,
> I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream" will receive more data than "splittedActivationTuple". I need to store "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
> DataStream<Tuple3<String, Integer, Double>> activationsStream,
> DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>
> return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
> .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
> .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
> Tuple2<String, Double> second) {
> return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
> }
> });
> }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh