You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "m@xi" <ma...@gmail.com> on 2018/02/13 08:59:13 UTC

Re: CoProcess() VS union.Process() & Timers in them

Hello XingCan,

Finally, I did it with union.

Now inside the processElement() function of my CoProcessFunction I am
setting a timer and periodically I want to print out some data through the
onTimer() function.

Below I attach the image stating the following: "Caused by:
java.lang.UnsupportedOperationException: Setting timers is only supported on
a keyed streams."

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/Screen_Shot_2018-02-13_at_16.png> 

My CoProcessFunction is an operator with parallelism=1 (I also use
forceNonParallel()) to make sure about that. Thus, I am not using a Keyed
State.

Is the Keyed State the only way of using Timers?

Furthermore, I must confess that the API is not so clear for the Managed
Operator State, so I am currently NOT implementing any CheckpointedFunction
etc etc.

Is my application going to return the correct results, if I assume no
failures etc etc.?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoProcess() VS union.Process() & Timers in them

Posted by "m@xi" <ma...@gmail.com>.
OK man! Thanks a lot.

To tell you the truth the documentation did not explain it in a convincing
way to consider it an important/potential operator to use in my
applications.

Thanks for mentioning.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoProcess() VS union.Process() & Timers in them

Posted by Fabian Hueske <fh...@gmail.com>.
I don't think that the mapping is that sophisticated.
I'd assume it is a bit simpler and just keeps one local pipeline (the one
with the same subtask index) which will run in the same slot (unless
explicitly configured differently).

TBH, I would not rely on this behavior. rescale() is rather an artifact of
the first version of the DataStream API.

Best, Fabian

2018-02-20 11:00 GMT+01:00 m@xi <ma...@gmail.com>:

> Hey Fabian!
>
> Thanks for the comprehensive replies. Now I understand those concepts
> properly.
>
> Regarding .rescale() , it does not receive any arguments. Thus, I assume
> that the way it does the shuffling from operator A to operator B instances
> is a black box for the programmer and probably has to do with the number of
> slots in each taskmanager. It strives to favour local data exchange (aka
> *intra-exchange* : between slot of the same taskmanager) instead of
> *inter-exchange* of data between different taskmanagers (that burdens the
> network).
>
> Am I correct?
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: CoProcess() VS union.Process() & Timers in them

Posted by "m@xi" <ma...@gmail.com>.
Hey Fabian!

Thanks for the comprehensive replies. Now I understand those concepts
properly.

Regarding .rescale() , it does not receive any arguments. Thus, I assume
that the way it does the shuffling from operator A to operator B instances
is a black box for the programmer and probably has to do with the number of
slots in each taskmanager. It strives to favour local data exchange (aka
*intra-exchange* : between slot of the same taskmanager) instead of
*inter-exchange* of data between different taskmanagers (that burdens the
network).

Am I correct?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoProcess() VS union.Process() & Timers in them

Posted by Fabian Hueske <fh...@gmail.com>.
Changing the parallelism works in Flink by taking a savepoint, shutting
down the job, and restarting it from the savepoint with another parallelism.

The rescale() operator defines how records are exchanged between two
operators with different parallelism.
Rescale prefers local data exchange over uniform distribution (which would
be rebalance()).

For example if you have a pipeline A -rescale-> B, where operator A has 2
tasks and operator B 4 tasks, then A(1) would send data to B(1) and B(3)
and A(2) to B(2) and B(4).
Since A(1) / B(1) and A(2) / B(2) run on the same machine (unless
explicitly differently scheduled), the data exchange between them is local.

Best, Fabian

2018-02-13 16:22 GMT+01:00 m@xi <ma...@gmail.com>:

> Thanks a lot Fabian and Xingcan!
>
> @ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one
> may change the parallelism while the Flink job is running, e.g. lower the
> parallelism during the weekend?
>
>
> Also, it is not clear to me how to use the rescale() operator. If you may
> provide a more thorough example, cause the one in the documentation is not
> so good in my humble opinion. With some code/pseudo code, it would be
> great.
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: CoProcess() VS union.Process() & Timers in them

Posted by "m@xi" <ma...@gmail.com>.
Thanks a lot Fabian and Xingcan!

@ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one
may change the parallelism while the Flink job is running, e.g. lower the
parallelism during the weekend?


Also, it is not clear to me how to use the rescale() operator. If you may
provide a more thorough example, cause the one in the documentation is not
so good in my humble opinion. With some code/pseudo code, it would be great.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoProcess() VS union.Process() & Timers in them

Posted by Fabian Hueske <fh...@gmail.com>.
You might also want to change the parallelism if the rate of your input
streams varies, e.g., you scale an application down over night or the
weekend.

2018-02-13 13:43 GMT+01:00 Xingcan Cui <xi...@gmail.com>:

> Hi Max,
>
> Currently, the timers can only be used with keyed streams. As @Fabian
> suggested, you can “forge” a keyed stream with the special KeySelector,
> which maps all the records to the same key.
>
> IMO, Flink uses keyed streams/states as it’s a deterministic distribution
> mechanism. Here, “the parallelism changes” may also refer to a parallelism
> change after the job restarts (e.g., when a node crashes). Flink can make
> sure that all the processing tasks and states will be safely re-distributed
> across the new cluster.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> > On 13 Feb 2018, at 5:18 PM, m@xi <ma...@gmail.com> wrote:
> >
> > OK Great!
> >
> > Thanks a lot for the super ultra fast answer Fabian!
> >
> > One intuitive follow-up question.
> >
> > So, keyed state is the most preferable one, as it is easy for the Flink
> > System to perform the re-distribution in case of change in parallelism,
> if
> > we have a scale-up or scale-down. Also, it is useful to use hash
> partition a
> > stream to different nodes/processors/PU (Processing Units) in general, by
> > Keyed State.
> >
> > Any other reasons for making Keyed State a must?
> >
> > Last but not least, can you elaborate further on the "when the
> parallelism
> > changes" part. I have read this in many topics in this forum, but I
> cannot
> > understand its essence. For example, I define the parallelism of each
> > operator in my Flink Job program based on the number of available PU.
> Maybe
> > the essence lies in the fast that the number of PU might change from
> time to
> > time, e.g. add more servers to the cluster where Flink runs and without
> > stopping the Flink Job that runs you may perform the rescaling.
> >
> > Thanks in advance.
> >
> > Best,
> > Max
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
> >
>
>

Re: CoProcess() VS union.Process() & Timers in them

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Max,

Currently, the timers can only be used with keyed streams. As @Fabian suggested, you can “forge” a keyed stream with the special KeySelector, which maps all the records to the same key.

IMO, Flink uses keyed streams/states as it’s a deterministic distribution mechanism. Here, “the parallelism changes” may also refer to a parallelism change after the job restarts (e.g., when a node crashes). Flink can make sure that all the processing tasks and states will be safely re-distributed across the new cluster.

Hope that helps.

Best,
Xingcan

> On 13 Feb 2018, at 5:18 PM, m@xi <ma...@gmail.com> wrote:
> 
> OK Great!
> 
> Thanks a lot for the super ultra fast answer Fabian!
> 
> One intuitive follow-up question.
> 
> So, keyed state is the most preferable one, as it is easy for the Flink
> System to perform the re-distribution in case of change in parallelism, if
> we have a scale-up or scale-down. Also, it is useful to use hash partition a
> stream to different nodes/processors/PU (Processing Units) in general, by
> Keyed State.
> 
> Any other reasons for making Keyed State a must?
> 
> Last but not least, can you elaborate further on the "when the parallelism
> changes" part. I have read this in many topics in this forum, but I cannot
> understand its essence. For example, I define the parallelism of each
> operator in my Flink Job program based on the number of available PU. Maybe
> the essence lies in the fast that the number of PU might change from time to
> time, e.g. add more servers to the cluster where Flink runs and without
> stopping the Flink Job that runs you may perform the rescaling.
> 
> Thanks in advance.
> 
> Best,
> Max
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 


Re: CoProcess() VS union.Process() & Timers in them

Posted by "m@xi" <ma...@gmail.com>.
OK Great!

Thanks a lot for the super ultra fast answer Fabian!

One intuitive follow-up question.

So, keyed state is the most preferable one, as it is easy for the Flink
System to perform the re-distribution in case of change in parallelism, if
we have a scale-up or scale-down. Also, it is useful to use hash partition a
stream to different nodes/processors/PU (Processing Units) in general, by
Keyed State.

Any other reasons for making Keyed State a must?

Last but not least, can you elaborate further on the "when the parallelism
changes" part. I have read this in many topics in this forum, but I cannot
understand its essence. For example, I define the parallelism of each
operator in my Flink Job program based on the number of available PU. Maybe
the essence lies in the fast that the number of PU might change from time to
time, e.g. add more servers to the cluster where Flink runs and without
stopping the Flink Job that runs you may perform the rescaling.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoProcess() VS union.Process() & Timers in them

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Max,

you can use keyed state on an operator with parallelism 1 if you assign a
default key with a KeySelector:

stream.keyBy(new NullByteKeySelector)

with NullByteKeySelector defined as

public class NullByteKeySelector<T> implements KeySelector<T, Byte> {

   private static final long serialVersionUID = 614256539098549020L;

   @Override
   public Byte getKey(T value) throws Exception {
      return 0;
   }
}

With this trick, all records are assigned to the same key and you can use
keyed state and timers.

Best, Fabian

2018-02-13 9:59 GMT+01:00 m@xi <ma...@gmail.com>:

> Hello XingCan,
>
> Finally, I did it with union.
>
> Now inside the processElement() function of my CoProcessFunction I am
> setting a timer and periodically I want to print out some data through the
> onTimer() function.
>
> Below I attach the image stating the following: "Caused by:
> java.lang.UnsupportedOperationException: Setting timers is only supported
> on
> a keyed streams."
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t1161/Screen_Shot_2018-02-13_at_16.png>
>
> My CoProcessFunction is an operator with parallelism=1 (I also use
> forceNonParallel()) to make sure about that. Thus, I am not using a Keyed
> State.
>
> Is the Keyed State the only way of using Timers?
>
> Furthermore, I must confess that the API is not so clear for the Managed
> Operator State, so I am currently NOT implementing any CheckpointedFunction
> etc etc.
>
> Is my application going to return the correct results, if I assume no
> failures etc etc.?
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>